summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
commitab6fffad2230229810c995253a6f021e42e03aaf (patch)
treefdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker/src
parent35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff)
downloadqpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java190
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java154
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java46
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java105
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java31
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java23
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java40
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java31
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java316
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java28
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java94
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java93
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java45
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java2
36 files changed, 1059 insertions, 469 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 53dd6df599..631490ab5f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -167,11 +167,6 @@ public abstract class AbstractExchange implements Exchange
return _virtualHost;
}
- public QueueRegistry getQueueRegistry()
- {
- return getVirtualHost().getQueueRegistry();
- }
-
public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue)
{
return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 2873eb31e8..8e9f980e6b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
{
+ private final QueueRegistry _queueRegistry;
private UUID _id;
private VirtualHost _virtualHost;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
@@ -55,6 +56,11 @@ public class DefaultExchange implements Exchange
private LogSubject _logSubject;
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
+ public DefaultExchange(QueueRegistry queueRegistry)
+ {
+ _queueRegistry = queueRegistry;
+ }
+
@Override
public void initialise(UUID id,
@@ -82,7 +88,7 @@ public class DefaultExchange implements Exchange
@Override
public long getBindingCount()
{
- return _virtualHost.getQueueRegistry().getQueues().size();
+ return _virtualHost.getQueues().size();
}
@Override
@@ -146,7 +152,7 @@ public class DefaultExchange implements Exchange
@Override
public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
{
- if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
+ if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty()))
{
return convertToBinding(queue);
}
@@ -207,7 +213,7 @@ public class DefaultExchange implements Exchange
@Override
public List<AMQQueue> route(InboundMessage message)
{
- AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey());
+ AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
{
List<AMQQueue> noQueues = Collections.emptyList();
@@ -235,13 +241,13 @@ public class DefaultExchange implements Exchange
@Override
public boolean isBound(AMQShortString routingKey)
{
- return _virtualHost.getQueueRegistry().getQueue(routingKey) != null;
+ return _virtualHost.getQueue(routingKey == null ? null : routingKey.toString()) != null;
}
@Override
public boolean isBound(AMQQueue queue)
{
- return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue;
+ return _virtualHost.getQueue(queue.getName()) == queue;
}
@Override
@@ -283,7 +289,7 @@ public class DefaultExchange implements Exchange
@Override
public boolean isBound(String bindingKey)
{
- return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null;
+ return _virtualHost.getQueue(bindingKey) != null;
}
@Override
@@ -320,7 +326,7 @@ public class DefaultExchange implements Exchange
public Collection<Binding> getBindings()
{
List<Binding> bindings = new ArrayList<Binding>();
- for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues())
+ for(AMQQueue q : _virtualHost.getQueues())
{
bindings.add(convertToBinding(q));
}
@@ -330,7 +336,7 @@ public class DefaultExchange implements Exchange
@Override
public void addBindingListener(BindingListener listener)
{
- _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates.
+ _queueRegistry.addRegistryChangeListener(convertListener(listener));
}
private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 75c489c731..d8263a3c80 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -40,20 +41,23 @@ import java.util.concurrent.ConcurrentMap;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
-
/**
* Maps from exchange name to exchange instance
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
- private VirtualHost _host;
+
+ private final VirtualHost _host;
+ private final QueueRegistry _queueRegistry;
+
private final Collection<RegistryChangeListener> _listeners =
Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
- public DefaultExchangeRegistry(VirtualHost host)
+ public DefaultExchangeRegistry(VirtualHost host, QueueRegistry queueRegistry)
{
_host = host;
+ _queueRegistry = queueRegistry;
}
public void initialise(ExchangeFactory exchangeFactory) throws AMQException
@@ -61,7 +65,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
//create 'standard' exchanges:
new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore());
- _defaultExchange = new DefaultExchange();
+ _defaultExchange = new DefaultExchange(_queueRegistry);
UUID defaultExchangeId =
UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
index 6fe0607ab2..ae2031bd71 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -89,6 +89,7 @@ public interface Queue extends ConfiguredObject
public static final String EXCLUSIVE = "exclusive";
public static final String MESSAGE_GROUP_KEY = "messageGroupKey";
public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
+ public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
public static final String LVQ_KEY = "lvqKey";
public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
public static final String NO_LOCAL = "noLocal";
@@ -100,6 +101,10 @@ public interface Queue extends ConfiguredObject
public static final String TYPE = "type";
public static final String PRIORITIES = "priorities";
+ public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
+
+ public static final String FEDERATION_EXCLUDES = "federationExcludes";
+ public static final String FEDERATION_ID = "federationId";
public static final Collection<String> AVAILABLE_ATTRIBUTES =
@@ -134,6 +139,7 @@ public interface Queue extends ConfiguredObject
PRIORITIES
));
+
//children
Collection<Binding> getBindings();
Collection<Consumer> getConsumers();
@@ -144,6 +150,6 @@ public interface Queue extends ConfiguredObject
void visit(QueueEntryVisitor visitor);
void delete();
-
+
void setNotificationListener(QueueNotificationListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index a84a041b72..26ac99d5bd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -120,7 +120,7 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
- int CURRENT_CONFIG_VERSION = 2;
+ int CURRENT_CONFIG_VERSION = 3;
//children
Collection<VirtualHostAlias> getAliases();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 157b97cc07..96a7eacb92 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -66,25 +66,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
put(DESCRIPTION, String.class);
}});
- static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>();
- static
- {
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY);
-
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES);
- }
-
private final AMQQueue _queue;
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
@@ -190,15 +171,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
try
{
- QueueRegistry queueRegistry = _queue.getVirtualHost().getQueueRegistry();
- synchronized(queueRegistry)
- {
- _queue.delete();
- if (_queue.isDurable())
- {
- DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue);
- }
- }
+ _queue.getVirtualHost().removeQueue(_queue);
}
catch(AMQException e)
{
@@ -414,13 +387,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
- return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY);
+ return _queue.getAttribute(MESSAGE_GROUP_KEY);
}
else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
//We only return the boolean value if message groups are actually in use
- return getAttribute(MESSAGE_GROUP_KEY) == null ? null :
- SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+ return getAttribute(MESSAGE_GROUP_KEY) == null ? null : _queue.getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
}
else if(LVQ_KEY.equals(name))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index c09dd9449e..977fd5ae56 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -41,12 +41,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
-import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -68,6 +65,7 @@ import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -86,6 +84,7 @@ import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener
{
@@ -203,7 +202,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
private void populateQueues()
{
- Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues();
+ Collection<AMQQueue> actualQueues = _virtualHost.getQueues();
if ( actualQueues != null )
{
synchronized(_queueAdapters)
@@ -399,7 +398,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null)
{
- attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY);
+ attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
}
else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null)
{
@@ -415,7 +414,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes);
attributes.remove(Queue.MESSAGE_GROUP_KEY);
- attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key);
+ attributes.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, key);
}
if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS))
@@ -423,7 +422,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes))
{
attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS);
- attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ attributes.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
}
}
@@ -440,15 +439,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
attributes.remove(Queue.LIFETIME_POLICY);
attributes.remove(Queue.TIME_TO_LIVE);
- List<String> attrNames = new ArrayList<String>(attributes.keySet());
- for(String attr : attrNames)
- {
- if(QueueAdapter.ATTRIBUTE_MAPPINGS.containsKey(attr))
- {
- attributes.put(QueueAdapter.ATTRIBUTE_MAPPINGS.get(attr),attributes.remove(attr));
- }
- }
-
return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes);
}
@@ -472,33 +462,26 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
owner = authenticatedPrincipal.getName();
}
}
+
+ final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE;
+
try
{
- QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
- synchronized (queueRegistry)
- {
- if(_virtualHost.getQueueRegistry().getQueue(name)!=null)
- {
- throw new IllegalArgumentException("Queue with name "+name+" already exists");
- }
- AMQQueue queue =
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
- durable, owner, lifetime == LifetimePolicy.AUTO_DELETE,
- exclusive, _virtualHost, attributes);
- if(durable)
- {
- DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(),
- queue,
- FieldTable.convertToFieldTable(attributes));
- }
- synchronized (_queueAdapters)
- {
- return _queueAdapters.get(queue);
- }
+ AMQQueue queue =
+ _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name,
+ durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes);
+
+ synchronized (_queueAdapters)
+ {
+ return _queueAdapters.get(queue);
}
}
+ catch(QueueExistsException qe)
+ {
+ throw new IllegalArgumentException("Queue with name "+name+" already exists");
+ }
catch(AMQException e)
{
throw new IllegalArgumentException(e);
@@ -1057,7 +1040,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
if(VirtualHost.QUEUE_COUNT.equals(name))
{
- return _vhost.getQueueRegistry().getQueues().size();
+ return _vhost.getQueues().size();
}
else if(VirtualHost.EXCHANGE_COUNT.equals(name))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4f610cc925..cb6a9249d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -225,7 +225,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
void setAlternateExchange(Exchange exchange);
- Map<String, Object> getArguments();
+ Collection<String> getAvailableAttributes();
+ Object getAttribute(String attrName);
void checkCapacity(AMQSessionModel channel);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 1eeb6dccf3..5001c2fd2b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -29,42 +29,31 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQQueueFactory
+public class AMQQueueFactory implements QueueFactory
{
- public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
- public static final String X_QPID_CAPACITY = "x-qpid-capacity";
- public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
- public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
- public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
- public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
- public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
-
- public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
- public static final String X_QPID_DESCRIPTION = "x-qpid-description";
- public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
- public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
- public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+
- public static final String DLQ_ROUTING_KEY = "dlq";
- public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
- public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+
+ private final VirtualHost _virtualHost;
+ private final QueueRegistry _queueRegistry;
- private AMQQueueFactory()
+ public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry)
{
+ _virtualHost = virtualHost;
+ _queueRegistry = queueRegistry;
}
private abstract static class QueueProperty
@@ -129,56 +118,56 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumQueueDepth(value);
}
},
- new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
+ new QueueLongProperty(Queue.ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty(X_QPID_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setFlowResumeCapacity(value);
}
},
- new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS)
{
public void setPropertyValue(AMQQueue queue, int value)
{
@@ -189,13 +178,17 @@ public class AMQQueueFactory
/**
* @param id the id to use.
+ * @param deleteOnNoConsumer
*/
- public static AMQQueue createAMQQueueImpl(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ @Override
+ public AMQQueue createAMQQueueImpl(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
if (id == null)
{
@@ -206,16 +199,11 @@ public class AMQQueueFactory
throw new IllegalArgumentException("Queue name must not be null");
}
- // Access check
- if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
- {
- String description = "Permission denied: queue-name '" + queueName + "'";
- throw new AMQSecurityException(description);
- }
- QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
- boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
- if (isDLQEnabled)
+ QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
+
+ boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration);
+ if (createDLQ)
{
validateDLNames(queueName);
}
@@ -226,17 +214,17 @@ public class AMQQueueFactory
if(arguments != null)
{
- if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ if(arguments.containsKey(Queue.LVQ_KEY))
{
- conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY);
+ conflationKey = (String) arguments.get(Queue.LVQ_KEY);
if(conflationKey == null)
{
- conflationKey = QPID_LVQ_KEY;
+ conflationKey = QPID_DEFAULT_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES))
+ else if(arguments.containsKey(Queue.PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ Object prioritiesObj = arguments.get(Queue.PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
@@ -257,33 +245,36 @@ public class AMQQueueFactory
// TODO - should warn here of invalid format
}
}
- else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ else if(arguments.containsKey(Queue.SORT_KEY))
{
- sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ sortingKey = (String)arguments.get(Queue.SORT_KEY);
}
}
AMQQueue q;
if(sortingKey != null)
{
- q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey);
}
else if(conflationKey != null)
{
- q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+ q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+ q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+ q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
}
+ q.setDeleteOnNoConsumers(deleteOnNoConsumer);
+
//Register the new queue
- virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+ _queueRegistry.registerQueue(q);
+
+ q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName));
if(arguments != null)
{
@@ -294,21 +285,25 @@ public class AMQQueueFactory
p.setPropertyValue(q, arguments.get(p.getArgumentName().toString()));
}
}
+
+ if(arguments.get(Queue.NO_LOCAL) instanceof Boolean)
+ {
+ q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL));
+ }
+
}
- if(isDLQEnabled)
+ if(createDLQ)
{
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
Exchange dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName());
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
try
{
- dlExchange = virtualHost.createExchange(dlExchangeId,
+ dlExchange = _virtualHost.createExchange(dlExchangeId,
dlExchangeName,
ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
true, false, null);
@@ -321,23 +316,19 @@ public class AMQQueueFactory
AMQQueue dlQueue = null;
- synchronized(queueRegistry)
+ synchronized(_queueRegistry)
{
- dlQueue = queueRegistry.getQueue(dlQueueName);
+ dlQueue = _queueRegistry.getQueue(dlQueueName);
if(dlQueue == null)
{
//set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
final Map<String, Object> args = new HashMap<String, Object>();
- args.put(X_QPID_DLQ_ENABLED, false);
- args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
-
- dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
- //enter the dlq in the persistent store
- DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(),
- dlQueue,
- FieldTable.convertToFieldTable(args));
+ dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive,
+ false, args);
}
}
@@ -350,11 +341,31 @@ public class AMQQueueFactory
}
q.setAlternateExchange(dlExchange);
}
+ else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+ {
+
+ final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE);
+ Exchange altExchange;
+ try
+ {
+ altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
+ }
+ catch(IllegalArgumentException e)
+ {
+ altExchange = _virtualHost.getExchange(altExchangeAttr);
+ }
+ q.setAlternateExchange(altExchange);
+ }
+
+ if (q.isDurable() && !q.isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q);
+ }
return q;
}
- public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
+ public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException
{
String queueName = config.getName();
@@ -365,9 +376,9 @@ public class AMQQueueFactory
Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
// we need queues that are defined in config to have deterministic ids.
- UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName());
+ UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName());
- AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
+ AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, false, arguments);
q.configure(config);
return q;
}
@@ -414,21 +425,23 @@ public class AMQQueueFactory
* queue configuration
* @return true if DLQ enabled
*/
- protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
{
//feature is not to be enabled for temporary queues or when explicitly disabled by argument
- if (!autoDelete)
+ if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
- boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ boolean dlqArgumentPresent = arguments != null
+ && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
{
boolean dlqEnabled = true;
if (dlqArgumentPresent)
{
- Object argument = arguments.get(X_QPID_DLQ_ENABLED);
- dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
}
- return dlqEnabled;
+ return dlqEnabled ;
}
}
return false;
@@ -464,31 +477,30 @@ public class AMQQueueFactory
if(config.getArguments() != null && !config.getArguments().isEmpty())
{
- arguments.putAll(config.getArguments());
+ arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
}
if(config.isLVQ() || config.getLVQKey() != null)
{
- arguments.put(QPID_LAST_VALUE_QUEUE, 1);
- arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+ arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey());
}
else if (config.getPriority() || config.getPriorities() > 0)
{
- arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
}
else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
{
- arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ arguments.put(Queue.SORT_KEY, config.getQueueSortKey());
}
if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- arguments.put(X_QPID_DLQ_ENABLED, true);
+ arguments.put(Queue.CREATE_DLQ_ON_CREATION, true);
}
if (config.getDescription() != null && !"".equals(config.getDescription()))
{
- arguments.put(X_QPID_DESCRIPTION, config.getDescription());
+ arguments.put(Queue.DESCRIPTION, config.getDescription());
}
if (arguments.isEmpty())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index 27a9e13617..7308433759 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -59,8 +59,9 @@ public class DefaultQueueRegistry implements QueueRegistry
}
}
- public void unregisterQueue(AMQShortString name)
+ public void unregisterQueue(String nameString)
{
+ AMQShortString name = new AMQShortString(nameString);
AMQQueue q = _queueMap.remove(name);
if(q != null)
{
@@ -74,16 +75,11 @@ public class DefaultQueueRegistry implements QueueRegistry
}
}
- public AMQQueue getQueue(AMQShortString name)
+ private AMQQueue getQueue(AMQShortString name)
{
return _queueMap.get(name);
}
- public Collection<AMQShortString> getQueueNames()
- {
- return _queueMap.keySet();
- }
-
public Collection<AMQQueue> getQueues()
{
return _queueMap.values();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
new file mode 100644
index 0000000000..f5bee850c2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.qpid.server.model.Queue;
+
+public class QueueArgumentsConverter
+{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
+ public static final String QPID_ALERT_COUNT = "qpid.alert_count";
+ public static final String QPID_ALERT_SIZE = "qpid.alert_size";
+ public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
+
+ public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
+
+ public static final String X_QPID_DESCRIPTION = "x-qpid-description";
+ /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ */
+ public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+
+ public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+ public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude";
+ public static final String QPID_TRACE_ID = "qpid.trace.id";
+
+ public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+
+ /**
+ * No-local queue argument is used to support the no-local feature of Durable Subscribers.
+ */
+ public static final String QPID_NO_LOCAL = "no-local";
+ static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+ static
+ {
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_AGE, Queue.ALERT_THRESHOLD_MESSAGE_AGE);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_SIZE, Queue.ALERT_THRESHOLD_MESSAGE_SIZE);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_QUEUE_DEPTH, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_SIZE, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ ATTRIBUTE_MAPPINGS.put(QPID_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+
+ ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY);
+ ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
+
+ ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION);
+ ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
+ //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS);
+ ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+ ATTRIBUTE_MAPPINGS.put(QPID_TRACE_EXCLUDE, Queue.FEDERATION_EXCLUDES);
+ ATTRIBUTE_MAPPINGS.put(QPID_TRACE_ID, Queue.FEDERATION_ID);
+ ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
+
+ }
+
+
+ public static Map<String,Object> convertWireArgsToModel(Map<String,Object> wireArguments)
+ {
+ Map<String,Object> modelArguments = new HashMap<String, Object>();
+ if(wireArguments != null)
+ {
+ for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
+ {
+ if(wireArguments.containsKey(entry.getKey()))
+ {
+ modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
+ }
+ }
+ if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ {
+ modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+ }
+ if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
+ {
+ modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
+ SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(wireArguments.get(QPID_SHARED_MSG_GROUP)));
+ }
+ if(wireArguments.get(X_QPID_DLQ_ENABLED) != null)
+ {
+ modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString()));
+ }
+
+ if(wireArguments.get(QPID_NO_LOCAL) != null)
+ {
+ modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
+ }
+
+ }
+ return modelArguments;
+ }
+
+
+ public static Map<String,Object> convertModelArgsToWire(Map<String,Object> modelArguments)
+ {
+ Map<String,Object> wireArguments = new HashMap<String, Object>();
+ for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet())
+ {
+ if(modelArguments.containsKey(entry.getValue()))
+ {
+ wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue()));
+ }
+ }
+
+ if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ {
+ wireArguments.put(QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ }
+
+ return wireArguments;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
new file mode 100644
index 0000000000..5411a2bc9c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+
+public interface QueueFactory
+{
+ AMQQueue createAMQQueueImpl(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException;
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index e8c34128e9..bc1d5942bd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -32,11 +31,7 @@ public interface QueueRegistry
void registerQueue(AMQQueue queue);
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
+ void unregisterQueue(String name);
Collection<AMQQueue> getQueues();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b0ab93162a..e3dbd62b6c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
@@ -68,12 +69,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
- public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
- private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
private static final String QPID_NO_GROUP = "qpid.no-group";
private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
+
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
@@ -237,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
+ _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -255,19 +254,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
durable, !durable,
_entries.getPriorities() > 0));
- if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
{
- if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE))
+ if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+ && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
{
- Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG);
+ Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
else
@@ -358,13 +359,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_alternateExchange = exchange;
}
- /**
- * Arguments used to create this queue. The caller is assured
- * that null will never be returned.
- */
- public Map<String, Object> getArguments()
+
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return new ArrayList<String>(_arguments.keySet());
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
{
- return _arguments;
+ return _arguments.get(attrName);
}
public boolean isAutoDelete()
@@ -511,7 +516,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_logger.info("Auto-deleteing queue:" + this);
}
- delete();
+ getVirtualHost().removeQueue(this);
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -1340,7 +1345,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
}
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -2282,18 +2286,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
if (description == null)
{
- _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+ _arguments.remove(Queue.DESCRIPTION);
}
else
{
- _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+ _arguments.put(Queue.DESCRIPTION, description);
}
}
@Override
public String getDescription()
{
- return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+ return (String) _arguments.get(Queue.DESCRIPTION);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 931368cb97..960986ec45 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -167,12 +167,12 @@ public class SecurityManager implements ConfigurationChangeListener
{
String pluginTypeName = getPluginTypeName(accessControl);
_hostPlugins.put(pluginTypeName, accessControl);
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Added access control to host plugins with name: " + vhostName);
}
-
+
break;
}
}
@@ -366,7 +366,7 @@ public class SecurityManager implements ConfigurationChangeListener
}
public boolean authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive,
- final Boolean nowait, final Boolean passive, final AMQShortString queueName, final String owner)
+ final Boolean nowait, final Boolean passive, final String queueName, final String owner)
{
return checkAllPlugins(new AccessCheck()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
index 6c631fc360..893b371d11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
@@ -212,7 +212,7 @@ public class ObjectProperties
}
public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive,
- AMQShortString queueName, String owner)
+ String queueName, String owner)
{
super();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
index efb1e95e99..e9181c0e12 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -32,6 +33,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class DurableConfigurationStoreHelper
{
@@ -46,28 +48,23 @@ public class DurableConfigurationStoreHelper
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+
if (queue.getAlternateExchange() != null)
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- else
- {
- attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
- }
- if (attributesMap.containsKey(Queue.ARGUMENTS))
- {
- // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
- Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
- currentArgs.putAll(queue.getArguments());
- }
- else
+
+ Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+ for(String attrName : availableAttrs)
{
- attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+ attributesMap.put(attrName, queue.getAttribute(attrName));
}
+
store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
@@ -78,11 +75,9 @@ public class DurableConfigurationStoreHelper
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
}
- // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
- // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
- if (arguments != null)
+ for(String attrName : queue.getAvailableAttributes())
{
- attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
+ attributesMap.put(attrName, queue.getAttribute(attrName));
}
store.create(queue.getId(), QUEUE,attributesMap);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 4e27a008dd..d87431a415 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -58,11 +59,13 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
@@ -95,6 +98,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
+ private final AMQQueueFactory _queueFactory;
private volatile State _state = State.INITIALISING;
@@ -136,11 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
+
_queueRegistry = new DefaultQueueRegistry(this);
+ _queueFactory = new AMQQueueFactory(this, _queueRegistry);
+
_exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
+ _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
initialiseStatistics();
@@ -298,12 +305,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration);
String queueName = queue.getName();
if (queue.isDurable())
{
- DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
+ DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue);
}
//get the exchange name (returns default exchange name if none was specified)
@@ -428,12 +435,102 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return _queueRegistry.getQueue(name);
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return _queueRegistry.getQueue(id);
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return _queueRegistry.getQueues();
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ synchronized (getQueueRegistry())
+ {
+ int purged = queue.delete();
+
+ getQueueRegistry().unregisterQueue(queue.getName());
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ DurableConfigurationStore store = getDurableConfigurationStore();
+ DurableConfigurationStoreHelper.removeQueue(store, queue);
+ }
+ return purged;
+ }
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+ // Access check
+ if (!getSecurityManager().authoriseCreateQueue(autoDelete,
+ durable,
+ exclusive,
+ null,
+ null,
+ queueName,
+ owner))
+ {
+ String description = "Permission denied: queue-name '" + queueName + "'";
+ throw new AMQSecurityException(description);
+ }
+
+ synchronized (_queueRegistry)
+ {
+ if(_queueRegistry.getQueue(queueName) != null)
+ {
+ throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ if(id == null)
+ {
+
+ id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ while(_queueRegistry.getQueue(id) != null)
+ {
+ id = UUID.randomUUID();
+ }
+
+ }
+ else if(_queueRegistry.getQueue(id) != null)
+ {
+ throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ }
+ return _queueFactory.createAMQQueueImpl(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
+ arguments);
+ }
+
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return _exchangeRegistry.getExchange(name);
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return _exchangeRegistry.getExchange(id);
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return _exchangeRegistry.getDefaultExchange();
@@ -747,7 +844,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this, getExchangeRegistry()),
+ new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
new BindingRecoverer(this, getExchangeRegistry())
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index 7cfadbcadf..2d3a620e91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -91,7 +91,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
{
_unresolvedDependencies.add(new ExchangeDependency());
}
- _queue = _virtualHost.getQueueRegistry().getQueue(_queueId);
+ _queue = _virtualHost.getQueue(_queueId);
if(_queue == null)
{
_unresolvedDependencies.add(new QueueDependency());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 3526551073..8d05e719ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader;
@@ -60,6 +61,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider
currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader());
case 1:
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
+ case 2:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
+
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -213,7 +217,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider
UUID queueId = UUID.fromString(queueIdString);
ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId);
return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName()))
- || _virtualHost.getQueueRegistry().getQueue(queueId) != null);
+ || _virtualHost.getQueue(queueId) != null);
}
private boolean isBinding(final String type)
@@ -224,4 +228,39 @@ public class DefaultUpgraderProvider implements UpgraderProvider
}
+ /*
+ * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the
+ * attributes into the map using the model attribute names rather than the wire attribute names
+ */
+ private class Version2Upgrader extends NonNullUpgrader
+ {
+
+ private static final String ARGUMENTS = "arguments";
+
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ if(Queue.class.getSimpleName().equals(type))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
+ if(attributes.get(ARGUMENTS) instanceof Map)
+ {
+ newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes
+ .get(ARGUMENTS)));
+ }
+ newAttributes.putAll(attributes);
+ attributes = newAttributes;
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+ }
+
+ getNextUpgrader().configuredObject(id,type,attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 7929cd3e39..b4fbdf7544 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -20,18 +20,19 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
@@ -41,11 +42,15 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
private final VirtualHost _virtualHost;
private final ExchangeRegistry _exchangeRegistry;
+ private final QueueFactory _queueFactory;
- public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry)
+ public QueueRecoverer(final VirtualHost virtualHost,
+ final ExchangeRegistry exchangeRegistry,
+ final QueueFactory queueFactory)
{
_virtualHost = virtualHost;
_exchangeRegistry = exchangeRegistry;
+ _queueFactory = queueFactory;
}
@Override
@@ -101,26 +106,24 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
String queueName = (String) _attributes.get(Queue.NAME);
String owner = (String) _attributes.get(Queue.OWNER);
boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
- @SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS);
+
+ Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
+ queueArgumentsMap.remove(Queue.NAME);
+ queueArgumentsMap.remove(Queue.OWNER);
+ queueArgumentsMap.remove(Queue.EXCLUSIVE);
+
try
{
- _queue = _virtualHost.getQueueRegistry().getQueue(_id);
+ _queue = _virtualHost.getQueue(_id);
if(_queue == null)
{
- _queue = _virtualHost.getQueueRegistry().getQueue(queueName);
+ _queue = _virtualHost.getQueue(queueName);
}
if (_queue == null)
{
- _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost,
- queueArgumentsMap);
- _virtualHost.getQueueRegistry().registerQueue(_queue);
-
- if (_alternateExchange != null)
- {
- _queue.setAlternateExchange(_alternateExchange);
- }
+ _queue = _queueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive,
+ false, queueArgumentsMap);
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index e06e785338..2ebbedccd4 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -21,15 +21,18 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -45,7 +48,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
String getName();
- QueueRegistry getQueueRegistry();
+ AMQQueue getQueue(String name);
+
+ AMQQueue getQueue(UUID id);
+
+ Collection<AMQQueue> getQueues();
+
+ int removeQueue(AMQQueue queue) throws AMQException;
+
+ AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException;
+
Exchange createExchange(UUID id,
String exchange,
@@ -58,6 +77,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
void removeExchange(Exchange exchange, boolean force) throws AMQException;
Exchange getExchange(String name);
+ Exchange getExchange(UUID id);
+
Exchange getDefaultExchange();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 3738306f6a..39ca3197b4 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -179,7 +179,7 @@ public class VirtualHostConfigRecoveryHandler implements
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
+ final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -268,7 +268,7 @@ public class VirtualHostConfigRecoveryHandler implements
public void queueEntry(final UUID queueId, long messageId)
{
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
+ AMQQueue queue = _virtualHost.getQueue(queueId);
try
{
if(queue != null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java
new file mode 100644
index 0000000000..54f7d0d172
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugins;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class QueueExistsException extends AMQException
+{
+ private final AMQQueue _existing;
+
+ public QueueExistsException(String name, AMQQueue existing)
+ {
+ super(name);
+ _existing = existing;
+ }
+
+ public AMQQueue getExistingQueue()
+ {
+ return _existing;
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index de6d036f29..dc9ddf7b32 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check that atest was a priority queue with 5 priorities
- AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue atest = vhost.getQueue("atest");
assertTrue(atest instanceof AMQPriorityQueue);
assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
// Check that ptest was a priority queue with 10 priorities
- AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
+ AMQQueue ptest = vhost.getQueue("ptest");
assertTrue(ptest instanceof AMQPriorityQueue);
assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
// Check that ntest wasn't a priority queue
- AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
+ AMQQueue ntest = vhost.getQueue("ntest");
assertFalse(ntest instanceof AMQPriorityQueue);
}
@@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(getName());
// Check specifically configured values
- AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
+ AMQQueue aTest = vhost.getQueue("atest");
assertEquals(4, aTest.getMaximumQueueDepth());
assertEquals(5, aTest.getMaximumMessageSize());
assertEquals(6, aTest.getMaximumMessageAge());
// Check default values
- AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
+ AMQQueue bTest = vhost.getQueue("btest");
assertEquals(1, bTest.getMaximumQueueDepth());
assertEquals(2, bTest.getMaximumMessageSize());
assertEquals(3, bTest.getMaximumMessageAge());
@@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase
assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled());
// Get queues
- AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles"));
- AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle"));
- AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2"));
- AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0"));
+ AMQQueue biggles = test.getQueue("biggles");
+ AMQQueue beetle = test.getQueue("beetle");
+ AMQQueue r2d2 = extra.getQueue("r2d2");
+ AMQQueue c3p0 = extra.getQueue("c3p0");
// Disabled specifically for this queue, overriding virtualhost setting
assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f2a64381df..7adec3d595 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testDirectMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testStarMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashMatch() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMidHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
routeMessage("a.c.d.b",0l);
@@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMatchafterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashAfterHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testHashHash() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testSubMatchFails() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
int queueCount = routeMessage("a.b.c",0l);
@@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreRouting() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase
public void testMoreQueue() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
+ AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+ false, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 693fd16b9f..a468fa072b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import junit.framework.AssertionFailedError;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import java.util.ArrayList;
+import org.apache.qpid.server.model.Queue;
import static org.mockito.Mockito.when;
@@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
@Override
public void setUp() throws Exception
{
- FieldTable arguments = new FieldTable();
- arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3);
- setArguments(arguments);
+ setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3));
super.setUp();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index c8e0e53d75..62c9b4c46d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,91 +20,213 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.configuration.XMLConfiguration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class AMQQueueFactoryTest extends QpidTestCase
{
private QueueRegistry _queueRegistry;
private VirtualHost _virtualHost;
- private Broker _broker;
+ private AMQQueueFactory _queueFactory;
+ private List<AMQQueue> _queues;
+ private QueueConfiguration _queueConfiguration;
@Override
public void setUp() throws Exception
{
super.setUp();
- BrokerTestHelper.setUp();
- XMLConfiguration configXml = new XMLConfiguration();
- configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName());
+ _queues = new ArrayList<AMQQueue>();
+ _virtualHost = mock(VirtualHost.class);
- _broker = BrokerTestHelper.createBrokerMock();
- if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings"))
- {
- when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5);
- when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true);
- }
+ VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class);
+ when(_virtualHost.getConfiguration()).thenReturn(vhostConfig);
+ _queueConfiguration = mock(QueueConfiguration.class);
+ when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration);
+ LogActor logActor = mock(LogActor.class);
+ CurrentActor.set(logActor);
+ RootMessageLogger rootLogger = mock(RootMessageLogger.class);
+ when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
+ DurableConfigurationStore store = mock(DurableConfigurationStore.class);
+ when(_virtualHost.getDurableConfigurationStore()).thenReturn(store);
+
+ mockExchangeCreation();
+ mockQueueRegistry();
+ delegateVhostQueueCreation();
+
+ when(_virtualHost.getQueues()).thenReturn(_queues);
+
+
+ _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry);
- _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker));
- _queueRegistry = _virtualHost.getQueueRegistry();
}
+ private void delegateVhostQueueCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class);
+ final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class);
+
+ when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(),
+ autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then(
+ new Answer<AMQQueue>()
+ {
+ @Override
+ public AMQQueue answer(InvocationOnMock invocation) throws Throwable
+ {
+ return _queueFactory.createAMQQueueImpl(id.getValue(),
+ queueName.getValue(),
+ durable.getValue(),
+ owner.getValue(),
+ autoDelete.getValue(),
+ exclusive.getValue(),
+ deleteOnNoConsumer.getValue(),
+ arguments.getValue());
+ }
+ }
+ );
+ }
+
+ private void mockQueueRegistry()
+ {
+ _queueRegistry = mock(QueueRegistry.class);
+
+ final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class);
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ AMQQueue queue = capturedQueue.getValue();
+ when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue);
+ when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue);
+ _queues.add(queue);
+
+ return null;
+ }
+ }).when(_queueRegistry).registerQueue(capturedQueue.capture());
+ }
+
+ private void mockExchangeCreation() throws AMQException
+ {
+ final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class);
+
+ when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(),
+ anyBoolean(), anyBoolean(), anyString())).then(
+ new Answer<Exchange>()
+ {
+ @Override
+ public Exchange answer(InvocationOnMock invocation) throws Throwable
+ {
+ final String name = exchangeNameCapture.getValue();
+ final UUID id = idCapture.getValue();
+
+ final Exchange exchange = mock(Exchange.class);
+ ExchangeType exType = mock(ExchangeType.class);
+
+ when(exchange.getName()).thenReturn(name);
+ when(exchange.getId()).thenReturn(id);
+ when(exchange.getType()).thenReturn(exType);
+ final String typeName = type.getValue();
+ when(exType.getType()).thenReturn(typeName);
+ when(exType.getName()).thenReturn(new AMQShortString(typeName));
+
+ when(_virtualHost.getExchange(eq(name))).thenReturn(exchange);
+ when(_virtualHost.getExchange(eq(id))).thenReturn(exchange);
+
+ final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class);
+
+ when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() {
+
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable
+ {
+ when(exchange.isBound(eq(queue.getValue()))).thenReturn(true);
+ return true;
+ }
+ });
+
+ return exchange;
+ }
+ }
+ );
+ }
+
@Override
public void tearDown() throws Exception
{
- try
- {
- _virtualHost.close();
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
+ super.tearDown();
}
private void verifyRegisteredQueueCount(int count)
{
- assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size());
+ assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size());
}
private void verifyQueueRegistered(String queueName)
{
- assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName));
+ assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName));
}
public void testPriorityQueueRegistration() throws Exception
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
- false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false,
+ false,
+ false,
+ attributes);
assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
verifyQueueRegistered("testPriorityQueue");
@@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testSimpleQueueRegistration() throws Exception
{
String queueName = getName();
- AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
- false, _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false,
+ false,
+ false,
+ null);
assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
verifyQueueRegistered(queueName);
//verify that no alternate exchange or DLQ were produced
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName));
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
* cause the alternate exchange to be set and DLQ to be produced.
* @throws AMQException
*/
public void testDeadLetterQueueEnabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
Exchange altExchange = queue.getAlternateExchange();
assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
@@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase
*/
public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
{
+
String queueName = "testDeadLetterQueueEnabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
+ when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5);
+ when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true);
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, null);
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ null);
assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount());
Exchange altExchange = queue.getAlternateExchange();
@@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName));
assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName));
- AMQQueue dlQueue = qReg.getQueue(dlQueueName);
+ AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName);
assertNotNull("The DLQ was not registered as expected", dlQueue);
assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
@@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
* result in the alternate exchange being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueDisabled() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false);
String queueName = "testDeadLetterQueueDisabled";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false,
+ false,
+ attributes);
assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
* creating an auto-delete queue, does not result in the alternate exchange
* being set and DLQ being created.
* @throws AMQException
*/
public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
- QueueRegistry qReg = _virtualHost.getQueueRegistry();
-
- assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName));
//create an autodelete queue
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false,
+ false,
+ attributes);
assertTrue("Queue should be autodelete", queue.isAutoDelete());
//ensure that the autodelete property overrides the request to enable DLQ
assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName));
+ assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName));
//only 1 queue should have been registered
verifyRegisteredQueueCount(1);
}
/**
- * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
+ * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
* the desired effect.
*/
public void testMaximumDeliveryCount() throws Exception
{
- final FieldTable fieldTable = new FieldTable();
- fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5);
+ Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, FieldTable.convertToMap(fieldTable));
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount());
@@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase
}
/**
- * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
+ * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
* that queue is created with a default maximumDeliveryCount of zero (unless set in config).
*/
public void testMaximumDeliveryCountDefault() throws Exception
{
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
- _virtualHost, null);
+ final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false,
+ false,
+ null);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount());
@@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
try
{
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false,
+ false,
+ null);
fail("queue with null name can not be created!");
}
catch (Exception e)
@@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLQ name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase
// change DLQ name to make its length bigger than exchange name
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
- FieldTable fieldTable = new FieldTable();
- fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true);
- AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
- false, false, _virtualHost, FieldTable.convertToMap(fieldTable));
+ Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true);
+ _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner",
+ false, false, false, attributes);
fail("queue with DLE name having more than 255 characters can not be created!");
}
catch (Exception e)
@@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase
public void testMessageGroupFromConfig() throws Exception
{
- PropertiesConfiguration queueConfig = new PropertiesConfiguration();
- queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey");
- queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1");
+ Map<String,String> arguments = new HashMap<String, String>();
+ arguments.put("qpid.group_header_key","mykey");
+ arguments.put("qpid.shared_msg_group","1");
+ QueueConfiguration qConf = mock(QueueConfiguration.class);
+ when(qConf.getArguments()).thenReturn(arguments);
+ when(qConf.getName()).thenReturn("test");
- final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);;
- QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig);
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost);
- assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY));
- assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
+ AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf);
+ assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
+ assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
private String generateStringWithLength(char ch, int length)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index b677ece408..e490db288c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue
}
- public Map<String, Object> getArguments()
+ @Override
+ public Collection<String> getAvailableAttributes()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
{
return null;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 0faa796f1c..2328745b83 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
+import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase
private AMQShortString _routingKey = new AMQShortString("routing key");
private DirectExchange _exchange;
private MockSubscription _subscription = new MockSubscription();
- private FieldTable _arguments = null;
+ private Map<String,Object> _arguments = null;
private MessagePublishInfo info = new MessagePublishInfo()
{
@@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
- false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(),
+ false, false, false, _arguments);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString());
}
@@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase
public void testCreateQueue() throws AMQException
{
_queue.stop();
- try {
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments));
+ try
+ {
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null,
+ false, _owner.asString(), false,
+ false, false, _arguments);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("name"));
}
- try {
+ try
+ {
_queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
assertNull("Queue was created", _queue);
}
@@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase
e.getMessage().contains("Host"));
}
- _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false,
- false, _virtualHost, FieldTable.convertToMap(_arguments));
+ _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(),
+ "differentName", false,
+ _owner.asString(), false,
+ false, false, _arguments);
assertNotNull("Queue was not created", _queue);
}
@@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase
return _subscription;
}
- public FieldTable getArguments()
+ public Map<String,Object> getArguments()
{
return _arguments;
}
- public void setArguments(FieldTable arguments)
+ public void setArguments(Map<String,Object> arguments)
{
_arguments = arguments;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index 4abb7233dc..c115af5a38 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase
try
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false,
- "owner", false, false, test, null);
+ SimpleAMQQueue queue = (SimpleAMQQueue)
+ test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null);
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 67cf0780da..e9ad4ba236 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec
import org.apache.qpid.server.store.Transaction.Record;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
@@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testBindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUnbindQueue() throws Exception
{
- AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
_exchange, FieldTable.convertToMap(_bindingArgs));
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueue() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
Map<String, Object> queueAttributes = new HashMap<String, Object>();
@@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testCreateQueueAMQQueueFieldTable() throws Exception
{
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
}
@@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
Exchange alternateExchange = createTestAlternateExchange();
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, null);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
reopenStore();
@@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueExclusivity() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
- queue = createTestQueue(getName(), getName() + "Owner", false);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testUpdateQueueAlternateExchange() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// update the queue to have exclusive=false
Exchange alternateExchange = createTestAlternateExchange();
- queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange);
- when(queue.getArguments()).thenReturn(attributes);
+ queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
DurableConfigurationStoreHelper.updateQueue(_configStore, queue);
@@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
- queueAttributes.put(Queue.ARGUMENTS, attributes);
+ queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
public void testRemoveQueue() throws Exception
{
// create queue
- AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
- attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
- FieldTable arguments = FieldTable.convertToFieldTable(attributes);
- DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments);
+ attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ DurableConfigurationStoreHelper.createQueue(_configStore, queue);
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
@@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
anyMap());
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ final Map<String, Object> arguments) throws AMQStoreException
{
- return createTestQueue(queueName, queueOwner, exclusive, null);
+ return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
}
- private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException
+ private AMQQueue createTestQueue(String queueName,
+ String queueOwner,
+ boolean exclusive,
+ Exchange alternateExchange,
+ final Map<String, Object> arguments) throws AMQStoreException
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getName()).thenReturn(queueName);
@@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
+ if(arguments != null && !arguments.isEmpty())
+ {
+ when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+ final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
+ when(queue.getAttribute(requestedAttribute.capture())).then(
+ new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ String attrName = requestedAttribute.getValue();
+ return arguments.get(attrName);
+ }
+ });
+ }
+
return queue;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index caf6acb4bb..cb1fc2737d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -181,9 +181,8 @@ public class BrokerTestHelper
public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException
{
- SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null,
- false, false, virtualHost, Collections.<String, Object>emptyMap());
- virtualHost.getQueueRegistry().registerQueue(queue);
+ SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
+ false, false, false, Collections.<String, Object>emptyMap());
return queue;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index 2d3483f078..66a71c562f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
@@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
private ExchangeRegistry _exchangeRegistry;
- private QueueRegistry _queueRegistry;
+ private QueueFactory _queueFactory;
@Override
public void setUp() throws Exception
@@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
+ when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class);
doAnswer(new Answer()
@@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
public Object answer(final InvocationOnMock invocation) throws Throwable
{
Exchange exchange = registeredExchange.getValue();
- when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange);
- when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange);
return null;
}
}).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
- _queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry);
+ final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
+ final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
+ final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
- when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+ _queueFactory = mock(QueueFactory.class);
- final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class);
- doAnswer(new Answer()
- {
+ when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(),
+ anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+ new Answer()
+ {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- AMQQueue queue = registeredQueue.getValue();
- when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue);
- when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue);
- return null;
- }
- }).when(_queueRegistry).registerQueue(registeredQueue.capture());
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ final AMQQueue queue = mock(AMQQueue.class);
+
+ final String queueName = queueArg.getValue();
+ final UUID queueId = idArg.getValue();
+
+ when(queue.getName()).thenReturn(queueName);
+ when(queue.getId()).thenReturn(queueId);
+ when(_vhost.getQueue(eq(queueName))).thenReturn(queue);
+ when(_vhost.getQueue(eq(queueId))).thenReturn(queue);
+
+ final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class);
+ doAnswer(
+ new Answer()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ final Exchange value = altExchangeArg.getValue();
+ when(queue.getAlternateExchange()).thenReturn(value);
+ return null;
+ }
+ }
+ ).when(queue).setAlternateExchange(altExchangeArg.capture());
+
+ Map args = argsArg.getValue();
+ if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+ {
+ final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
+ final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
+ queue.setAlternateExchange(exchange);
+ }
+ return queue;
+ }
+ });
_exchangeFactory = mock(ExchangeFactory.class);
+
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _exchangeRegistry),
+ new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory),
new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
new BindingRecoverer(_vhost, _exchangeRegistry)
};
@@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final UUID queueId = new UUID(1, 0);
final UUID exchangeId = new UUID(2, 0);
- /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than
- queue creation being on vhost - yuck! */
- SecurityManager securityManager = mock(SecurityManager.class);
- when(_vhost.getSecurityManager()).thenReturn(securityManager);
- when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),
- any(AMQShortString.class),anyString())).thenReturn(true);
- VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class);
- when(_vhost.getConfiguration()).thenReturn(configuration);
- QueueConfiguration queueConfiguration = mock(QueueConfiguration.class);
- when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration);
- LogActor logActor = mock(LogActor.class);
- CurrentActor.set(logActor);
- RootMessageLogger rootLogger = mock(RootMessageLogger.class);
- when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
- /* end of queue creation mock hackery */
-
final Exchange customExchange = mock(Exchange.class);
+ when(customExchange.getId()).thenReturn(exchangeId);
+ when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
+
when(_exchangeFactory.createExchange(eq(exchangeId),
eq(CUSTOM_EXCHANGE_NAME),
eq(HeadersExchange.TYPE.getType()),
@@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.completeConfigurationRecovery();
- assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange);
+ assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange());
}
private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 6769c1c2fc..1ca7ff1b65 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.virtualhost;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public AMQQueue getQueue(String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue getQueue(UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue> getQueues()
+ {
+ return null;
+ }
+
+ @Override
+ public int removeQueue(AMQQueue queue) throws AMQException
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQException
+ {
+ return null;
+ }
+
+ @Override
public Exchange createExchange(UUID id,
String exchange,
String type,
@@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost
}
@Override
+ public Exchange getExchange(UUID id)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getDefaultExchange()
{
return null;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index e72196c383..03cb483e40 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase
VirtualHost vhost = createVirtualHost(vhostName, config);
assertNotNull("virtualhost should exist", vhost);
- AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = vhost.getQueue(queueName);
assertNotNull("queue should exist", queue);
Exchange defaultExch = vhost.getDefaultExchange();