diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
| commit | ab6fffad2230229810c995253a6f021e42e03aaf (patch) | |
| tree | fdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker-plugins | |
| parent | 35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff) | |
| download | qpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz | |
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
14 files changed, 250 insertions, 284 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 8e79813216..60211823f8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,24 +20,26 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.LinkedHashMap; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; @@ -61,6 +63,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; import org.apache.qpid.transport.*; import java.nio.ByteBuffer; @@ -72,11 +75,6 @@ public class ServerSessionDelegate extends SessionDelegate { private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class); - /** - * No-local queue argument is used to support the no-local feature of Durable Subscribers. - */ - private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local"; - public ServerSessionDelegate() { @@ -195,10 +193,9 @@ public class ServerSessionDelegate extends SessionDelegate else { String queueName = method.getQueue(); - QueueRegistry queueRegistry = getQueueRegistry(session); + VirtualHost vhost = getVirtualHost(session); - - final AMQQueue queue = queueRegistry.getQueue(queueName); + final AMQQueue queue = vhost.getQueue(queueName); if(queue == null) { @@ -929,7 +926,6 @@ public class ServerSessionDelegate extends SessionDelegate { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -947,7 +943,7 @@ public class ServerSessionDelegate extends SessionDelegate { method.setBindingKey(method.getQueue()); } - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -991,7 +987,6 @@ public class ServerSessionDelegate extends SessionDelegate public void exchangeUnbind(Session session, ExchangeUnbind method) { VirtualHost virtualHost = getVirtualHost(session); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); if (!method.hasQueue()) { @@ -1007,7 +1002,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = queueRegistry.getQueue(method.getQueue()); + AMQQueue queue = virtualHost.getQueue(method.getQueue()); Exchange exchange = virtualHost.getExchange(method.getExchange()); if(queue == null) { @@ -1174,158 +1169,137 @@ public class ServerSessionDelegate extends SessionDelegate private AMQQueue getQueue(Session session, String queue) { - QueueRegistry queueRegistry = getQueueRegistry(session); - return queueRegistry.getQueue(queue); - } - - private QueueRegistry getQueueRegistry(Session session) - { - return getVirtualHost(session).getQueueRegistry(); + return getVirtualHost(session).getQueue(queue); } @Override public void queueDeclare(Session session, final QueueDeclare method) { - VirtualHost virtualHost = getVirtualHost(session); + final VirtualHost virtualHost = getVirtualHost(session); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); String queueName = method.getQueue(); AMQQueue queue; - QueueRegistry queueRegistry = getQueueRegistry(session); //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - synchronized (queueRegistry) + final boolean exclusive = method.getExclusive(); + final boolean autoDelete = method.getAutoDelete(); + + if(method.getPassive()) { + queue = virtualHost.getQueue(queueName); - if (((queue = queueRegistry.getQueue(queueName)) == null)) + if (queue == null) { + String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; - if (method.getPassive()) - { - String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND; + exception(session, method, errorCode, description); - exception(session, method, errorCode, description); + } + else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { + String description = "Cannot declare queue('" + queueName + "')," + + " as exclusive queue with same name " + + "declared on another session"; + ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - return; - } - else + exception(session, method, errorCode, description); + + } + } + else + { + + try + { + + String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null; + final String alternateExchangeName = method.getAlternateExchange(); + + + final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments()); + + if(alternateExchangeName != null && alternateExchangeName.length() != 0) { - try - { - queue = createQueue(queueName, method, virtualHost, (ServerSession)session); - if(!method.getExclusive() && method.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } + arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName); + } - final String alternateExchangeName = method.getAlternateExchange(); - if(alternateExchangeName != null && alternateExchangeName.length() != 0) - { - Exchange alternate = getExchange(session, alternateExchangeName); - queue.setAlternateExchange(alternate); - } + final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()); - if(method.hasArguments() && method.getArguments() != null) - { - if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL)) - { - Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL); - queue.setNoLocal(convertBooleanValue(noLocal)); - } - } + final boolean deleteOnNoConsumer = !exclusive && autoDelete; + queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner, + autoDelete, exclusive, deleteOnNoConsumer, + arguments); - if (queue.isDurable() && !queue.isAutoDelete()) + if (autoDelete && exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task deleteQueueTask = new ServerSession.Task() { - if(method.hasArguments() && method.getArguments() != null) + public void doTask(ServerSession session) { - Map<String,Object> args = method.getArguments(); - FieldTable ftArgs = new FieldTable(); - for(Map.Entry<String, Object> entry : args.entrySet()) + try + { + virtualHost.removeQueue(q); + } + catch (AMQException e) { - ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue()); + exception(session, method, e, "Cannot delete '" + method.getQueue()); } - DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs); } - else + }; + final ServerSession s = (ServerSession) session; + s.addSessionCloseTask(deleteQueueTask); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - DurableConfigurationStoreHelper.createQueue(store, queue, null); + s.removeSessionCloseTask(deleteQueueTask); } - } - queueRegistry.registerQueue(queue); - - if (method.hasAutoDelete() - && method.getAutoDelete() - && method.hasExclusive() - && method.getExclusive()) + }); + } + if (exclusive) + { + final AMQQueue q = queue; + final ServerSession.Task removeExclusive = new ServerSession.Task() + { + public void doTask(ServerSession session) { - final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - try - { - q.delete(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete '" + method.getQueue()); - } - } - }; - final ServerSession s = (ServerSession) session; - s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(deleteQueueTask); - } - }); + q.setAuthorizationHolder(null); + q.setExclusiveOwningSession(null); } - if (method.hasExclusive() - && method.getExclusive()) + }; + final ServerSession s = (ServerSession) session; + q.setExclusiveOwningSession(s); + s.addSessionCloseTask(removeExclusive); + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) throws AMQException { - final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() - { - public void doTask(ServerSession session) - { - q.setAuthorizationHolder(null); - q.setExclusiveOwningSession(null); - } - }; - final ServerSession s = (ServerSession) session; - q.setExclusiveOwningSession(s); - s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) throws AMQException - { - s.removeSessionCloseTask(removeExclusive); - } - }); + s.removeSessionCloseTask(removeExclusive); } - } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare queue '" + queueName); - } + }); } } - else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + catch(QueueExistsException qe) { + queue = qe.getExistingQueue(); + if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session))) + { String description = "Cannot declare queue('" + queueName + "')," + " as exclusive queue with same name " + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; exception(session, method, errorCode, description); - - return; + } + } + catch (AMQException e) + { + exception(session, method, e, "Cannot declare queue '" + queueName); } } } @@ -1354,20 +1328,6 @@ public class ServerSessionDelegate extends SessionDelegate return false; } - protected AMQQueue createQueue(final String queueName, - final QueueDeclare body, - VirtualHost virtualHost, - final ServerSession session) - throws AMQException - { - String owner = body.getExclusive() ? session.getClientID() : null; - - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner, - body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - - return queue; - } - @Override public void queueDelete(Session session, QueueDelete method) { @@ -1412,12 +1372,7 @@ public class ServerSessionDelegate extends SessionDelegate try { - queue.delete(); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); - DurableConfigurationStoreHelper.removeQueue(store,queue); - } + virtualHost.removeQueue(queue); } catch (AMQException e) { @@ -1471,7 +1426,14 @@ public class ServerSessionDelegate extends SessionDelegate result.setDurable(queue.isDurable()); result.setExclusive(queue.isExclusive()); result.setAutoDelete(queue.isAutoDelete()); - result.setArguments(queue.getArguments()); + Map<String, Object> arguments = new LinkedHashMap<String, Object>(); + Collection<String> availableAttrs = queue.getAvailableAttributes(); + + for(String attrName : availableAttrs) + { + arguments.put(attrName, queue.getAttribute(attrName)); + } + result.setArguments(QueueArgumentsConverter.convertModelArgsToWire(arguments)); result.setMessageCount(queue.getMessageCount()); result.setSubscriberCount(queue.getConsumerCount()); @@ -1491,7 +1453,7 @@ public class ServerSessionDelegate extends SessionDelegate if(sub == null) { - exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'"); + exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'"); } else if(sub.isStopped()) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index c6bceb6ac7..63582702cb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.flow.FlowCreditManager; @@ -33,6 +30,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.message.InboundMessage; @@ -40,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.InboundMessageAdapter; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -65,7 +64,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -169,9 +167,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _queue = queue; - Map<String, Object> arguments = queue.getArguments(); - _traceExclude = (String) arguments.get("qpid.trace.exclude"); - _trace = (String) arguments.get("qpid.trace.id"); + _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES); + _trace = (String) queue.getAttribute(Queue.FEDERATION_ID); String filterLogString = null; _logActor = GenericActor.getInstance(this); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index 6577efe292..4e620327c9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -73,7 +73,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic " args:" + body.getArguments()); } - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern()); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString()); if (queue == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 8883422989..5238a41e49 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -74,7 +74,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB else { channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue()); + AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); if (queue == null) { _log.info("No queue for '" + body.getQueue() + "'"); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java index 85f0a6fd3d..ba5692fc6c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java @@ -70,7 +70,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); MethodRegistry methodRegistry = session.getMethodRegistry(); final AMQChannel channel = session.getChannel(channelId); @@ -115,7 +114,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo else { - AMQQueue queue = queueRegistry.getQueue(queueName); + AMQQueue queue = virtualHost.getQueue(queueName.toString()); if (queue == null) { @@ -141,7 +140,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo } else if (queueName != null) { - AMQQueue queue = queueRegistry.getQueue(queueName); + AMQQueue queue = virtualHost.getQueue(queueName.toString()); if (queue == null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index a8e4e38422..359bd2eb19 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) @@ -73,7 +72,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> final AMQQueue queue; final AMQShortString routingKey; - if (body.getQueue() == null) + final AMQShortString queueName = body.getQueue(); + + if (queueName == null) { queue = channel.getDefaultQueue(); @@ -94,13 +95,13 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(queueName.toString()); routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); } if (queue == null) { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist."); } final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString(); final Exchange exch = virtualHost.getExchange(exchangeName); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 9f887d881d..fd547d4bac 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; @@ -44,6 +45,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -61,8 +63,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); final AMQSessionModel session = protocolConnection.getChannel(channelId); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); final AMQShortString queueName; @@ -87,97 +87,103 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throw body.getChannelNotFoundException(channelId); } - synchronized (queueRegistry) + if(body.getPassive()) { - queue = queueRegistry.getQueue(queueName); - - AMQSessionModel owningSession = null; - - if (queue != null) + queue = virtualHost.getQueue(queueName.toString()); + if (queue == null) { - owningSession = queue.getExclusiveOwningSession(); + String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; + throw body.getChannelException(AMQConstant.NOT_FOUND, msg); } - - if (queue == null) + else { - if (body.getPassive()) + AMQSessionModel owningSession = queue.getExclusiveOwningSession(); + if (queue.isExclusive() && !queue.isDurable() + && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg); + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); } - else + + //set this as the default queue on the channel: + channel.setDefaultQueue(queue); + } + } + else + { + + try + { + + queue = createQueue(queueName, body, virtualHost, protocolConnection); + queue.setAuthorizationHolder(protocolConnection); + + if (body.getExclusive()) { - queue = createQueue(queueName, body, virtualHost, protocolConnection); + queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); queue.setAuthorizationHolder(protocolConnection); - if (queue.isDurable() && !queue.isAutoDelete()) - { - DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments()); - } - if(body.getAutoDelete()) - { - queue.setDeleteOnNoConsumers(true); - } - queueRegistry.registerQueue(queue); - if (body.getExclusive()) - { - queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); - queue.setAuthorizationHolder(protocolConnection); - if(!body.getDurable()) + if(!body.getDurable()) + { + final AMQQueue q = queue; + final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { - final AMQQueue q = queue; - final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() + public void doTask(AMQProtocolSession session) throws AMQException { - public void doTask(AMQProtocolSession session) throws AMQException - { - q.setExclusiveOwningSession(null); - } - }; - protocolConnection.addSessionCloseTask(sessionCloseTask); - queue.addQueueDeleteTask(new AMQQueue.Task() { - public void doTask(AMQQueue queue) throws AMQException - { - protocolConnection.removeSessionCloseTask(sessionCloseTask); - } - }); - } + q.setExclusiveOwningSession(null); + } + }; + protocolConnection.addSessionCloseTask(sessionCloseTask); + queue.addQueueDeleteTask(new AMQQueue.Task() { + public void doTask(AMQQueue queue) throws AMQException + { + protocolConnection.removeSessionCloseTask(sessionCloseTask); + } + }); } } - } - else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); - } - else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive())) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: " - + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) + catch(QueueExistsException qe) { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " - + "as exclusive queue with same name " - + "declared on another client ID('" - + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); - } - else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: " - + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); - } - else if(!body.getPassive() && queue.isDurable() != body.getDurable()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: " - + queue.isDurable() + " requested " + body.getDurable() + ")"); - } + queue = qe.getExistingQueue(); + AMQSessionModel owningSession = queue.getExclusiveOwningSession(); + + if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection)) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + } + else if(queue.isExclusive() != body.getExclusive()) + { + + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: " + + queue.isExclusive() + " requested " + body.getExclusive() + ")"); + } + else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection))) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " + + "as exclusive queue with same name " + + "declared on another client ID('" + + queue.getOwner() + "') your clientID('" + session.getClientID() + "')"); + } + else if(queue.isAutoDelete() != body.getAutoDelete()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: " + + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")"); + } + else if(queue.isDurable() != body.getDurable()) + { + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, + "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: " + + queue.isDurable() + " requested " + body.getDurable() + ")"); + } + } //set this as the default queue on the channel: channel.setDefaultQueue(queue); @@ -204,30 +210,35 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar protected AMQQueue createQueue(final AMQShortString queueName, QueueDeclareBody body, - VirtualHost virtualHost, + final VirtualHost virtualHost, final AMQProtocolSession session) throws AMQException { - final QueueRegistry registry = virtualHost.getQueueRegistry(); - String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null; - Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments()); + final boolean durable = body.getDurable(); + final boolean autoDelete = body.getAutoDelete(); + final boolean exclusive = body.getExclusive(); + + String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null; + + Map<String, Object> arguments = + QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); String queueNameString = AMQShortString.toString(queueName); + final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()), - queueNameString, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(),virtualHost, arguments); + final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete, + exclusive, autoDelete, arguments); - if (body.getExclusive() && !body.getDurable()) + if (exclusive && !durable) { final AMQProtocolSession.Task deleteQueueTask = new AMQProtocolSession.Task() { public void doTask(AMQProtocolSession session) throws AMQException { - if (registry.getQueue(queueName) == queue) + if (virtualHost.getQueue(queueName.toString()) == queue) { - queue.delete(); + virtualHost.removeQueue(queue); } } }; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java index 6f5e0ea992..a39faf2e70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java @@ -62,7 +62,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); @@ -82,7 +81,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); } if (queue == null) @@ -112,12 +111,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); } - int purged = queue.delete(); - - if (queue.isDurable()) - { - DurableConfigurationStoreHelper.removeQueue(store, queue); - } + int purged = virtualHost.removeQueue(queue); MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java index e925eb7455..ff845d3c16 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java @@ -60,7 +60,6 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod { AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); VirtualHost virtualHost = protocolConnection.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) @@ -84,7 +83,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); } if(queue == null) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java index aad5446cb5..20405b82ab 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java @@ -59,8 +59,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB { AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - final AMQQueue queue; final AMQShortString routingKey; @@ -87,7 +85,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB } else { - queue = queueRegistry.getQueue(body.getQueue()); + queue = virtualHost.getQueue(body.getQueue().toString()); routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index ca67b6f79b..35f24afbce 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -110,10 +110,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(destination instanceof QueueDestination) { queue = ((QueueDestination) _destination).getQueue(); - if(queue.getArguments() != null && queue.getArguments().containsKey("topic")) + + if(queue.getAvailableAttributes().contains("topic")) { source.setDistributionMode(StdDistMode.COPY); } + qd = (QueueDestination) destination; Map<Symbol,Filter> filters = source.getFilter(); @@ -194,19 +196,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS name = UUID.randomUUID().toString(); } - queue = _vhost.getQueueRegistry().getQueue(name); + queue = _vhost.getQueue(name); Exchange exchange = exchangeDestination.getExchange(); if(queue == null) { - queue = AMQQueueFactory.createAMQQueueImpl( + queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(name, _vhost.getName()), name, isDurable, null, true, true, - _vhost, + true, Collections.EMPTY_MAP); } else @@ -309,11 +311,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { @@ -417,7 +419,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { try { - queue.delete(); + queue.getVirtualHost().removeQueue(queue); } catch(AMQException e) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ed75a8c165..d3962c779c 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -107,7 +107,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -256,7 +256,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } else { - AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr); + AMQQueue queue = _vhost.getQueue(addr); if(queue != null) { @@ -329,14 +329,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - _vhost, - properties); + final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), + queueName, + false, // durable + null, // owner + false, // autodelete + false, // exclusive + false, + properties); @@ -347,11 +347,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu { public void doTask(Connection_1_0 session) { - if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue) + if (_vhost.getQueue(queueName) == tempQueue) { try { - tempQueue.delete(); + _vhost.removeQueue(tempQueue); } catch (AMQException e) { diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 67ac1bdc7c..2c88f83405 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; @MBeanDescription("This MBean exposes the broker level management features") public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker @@ -180,7 +181,8 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi throws IOException, JMException { final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments); - getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs); + getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, + QueueArgumentsConverter.convertWireArgsToModel(createArgs)); } @@ -196,11 +198,11 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi if (_moveNonExclusiveQueueOwnerToDescription && owner != null) { argumentsCopy = new HashMap<String, Object>(arguments == null ? new HashMap<String, Object>() : arguments); - if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION)) + if (!argumentsCopy.containsKey(QueueArgumentsConverter.X_QPID_DESCRIPTION)) { - LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION); + LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION); - argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner); + argumentsCopy.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, owner); } else { diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index e3fac9f711..4240dd5280 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; public class VirtualHostManagerMBeanTest extends TestCase { @@ -79,16 +80,16 @@ public class VirtualHostManagerMBeanTest extends TestCase { _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true); - Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER); + Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception { - Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); _virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments); - Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION); + Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION); verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments); } |
