summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-18 09:13:02 +0000
commitab6fffad2230229810c995253a6f021e42e03aaf (patch)
treefdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker-plugins
parent35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff)
downloadqpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java252
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java179
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java16
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java24
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java10
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java7
14 files changed, 250 insertions, 284 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 8e79813216..60211823f8 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,24 +20,26 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.LinkedHashMap;
+import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
@@ -61,6 +63,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
import org.apache.qpid.transport.*;
import java.nio.ByteBuffer;
@@ -72,11 +75,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
- /**
- * No-local queue argument is used to support the no-local feature of Durable Subscribers.
- */
- private static final String QUEUE_ARGUMENT_NO_LOCAL = "no-local";
-
public ServerSessionDelegate()
{
@@ -195,10 +193,9 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
String queueName = method.getQueue();
- QueueRegistry queueRegistry = getQueueRegistry(session);
+ VirtualHost vhost = getVirtualHost(session);
-
- final AMQQueue queue = queueRegistry.getQueue(queueName);
+ final AMQQueue queue = vhost.getQueue(queueName);
if(queue == null)
{
@@ -929,7 +926,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
{
@@ -947,7 +943,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
method.setBindingKey(method.getQueue());
}
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
@@ -991,7 +987,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void exchangeUnbind(Session session, ExchangeUnbind method)
{
VirtualHost virtualHost = getVirtualHost(session);
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
if (!method.hasQueue())
{
@@ -1007,7 +1002,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = queueRegistry.getQueue(method.getQueue());
+ AMQQueue queue = virtualHost.getQueue(method.getQueue());
Exchange exchange = virtualHost.getExchange(method.getExchange());
if(queue == null)
{
@@ -1174,158 +1169,137 @@ public class ServerSessionDelegate extends SessionDelegate
private AMQQueue getQueue(Session session, String queue)
{
- QueueRegistry queueRegistry = getQueueRegistry(session);
- return queueRegistry.getQueue(queue);
- }
-
- private QueueRegistry getQueueRegistry(Session session)
- {
- return getVirtualHost(session).getQueueRegistry();
+ return getVirtualHost(session).getQueue(queue);
}
@Override
public void queueDeclare(Session session, final QueueDeclare method)
{
- VirtualHost virtualHost = getVirtualHost(session);
+ final VirtualHost virtualHost = getVirtualHost(session);
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
String queueName = method.getQueue();
AMQQueue queue;
- QueueRegistry queueRegistry = getQueueRegistry(session);
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
- synchronized (queueRegistry)
+ final boolean exclusive = method.getExclusive();
+ final boolean autoDelete = method.getAutoDelete();
+
+ if(method.getPassive())
{
+ queue = virtualHost.getQueue(queueName);
- if (((queue = queueRegistry.getQueue(queueName)) == null))
+ if (queue == null)
{
+ String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
- if (method.getPassive())
- {
- String description = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- ExecutionErrorCode errorCode = ExecutionErrorCode.NOT_FOUND;
+ exception(session, method, errorCode, description);
- exception(session, method, errorCode, description);
+ }
+ else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ {
+ String description = "Cannot declare queue('" + queueName + "'),"
+ + " as exclusive queue with same name "
+ + "declared on another session";
+ ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
- return;
- }
- else
+ exception(session, method, errorCode, description);
+
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
+ final String alternateExchangeName = method.getAlternateExchange();
+
+
+ final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments());
+
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
- try
- {
- queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
- if(!method.getExclusive() && method.getAutoDelete())
- {
- queue.setDeleteOnNoConsumers(true);
- }
+ arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName);
+ }
- final String alternateExchangeName = method.getAlternateExchange();
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
- {
- Exchange alternate = getExchange(session, alternateExchangeName);
- queue.setAlternateExchange(alternate);
- }
+ final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- if(method.hasArguments() && method.getArguments() != null)
- {
- if(method.getArguments().containsKey(QUEUE_ARGUMENT_NO_LOCAL))
- {
- Object noLocal = method.getArguments().get(QUEUE_ARGUMENT_NO_LOCAL);
- queue.setNoLocal(convertBooleanValue(noLocal));
- }
- }
+ final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
+ autoDelete, exclusive, deleteOnNoConsumer,
+ arguments);
- if (queue.isDurable() && !queue.isAutoDelete())
+ if (autoDelete && exclusive)
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task deleteQueueTask = new ServerSession.Task()
{
- if(method.hasArguments() && method.getArguments() != null)
+ public void doTask(ServerSession session)
{
- Map<String,Object> args = method.getArguments();
- FieldTable ftArgs = new FieldTable();
- for(Map.Entry<String, Object> entry : args.entrySet())
+ try
+ {
+ virtualHost.removeQueue(q);
+ }
+ catch (AMQException e)
{
- ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
+ exception(session, method, e, "Cannot delete '" + method.getQueue());
}
- DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs);
}
- else
+ };
+ final ServerSession s = (ServerSession) session;
+ s.addSessionCloseTask(deleteQueueTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
{
- DurableConfigurationStoreHelper.createQueue(store, queue, null);
+ s.removeSessionCloseTask(deleteQueueTask);
}
- }
- queueRegistry.registerQueue(queue);
-
- if (method.hasAutoDelete()
- && method.getAutoDelete()
- && method.hasExclusive()
- && method.getExclusive())
+ });
+ }
+ if (exclusive)
+ {
+ final AMQQueue q = queue;
+ final ServerSession.Task removeExclusive = new ServerSession.Task()
+ {
+ public void doTask(ServerSession session)
{
- final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- try
- {
- q.delete();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete '" + method.getQueue());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ q.setAuthorizationHolder(null);
+ q.setExclusiveOwningSession(null);
}
- if (method.hasExclusive()
- && method.getExclusive())
+ };
+ final ServerSession s = (ServerSession) session;
+ q.setExclusiveOwningSession(s);
+ s.addSessionCloseTask(removeExclusive);
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue) throws AMQException
{
- final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue) throws AMQException
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ s.removeSessionCloseTask(removeExclusive);
}
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare queue '" + queueName);
- }
+ });
}
}
- else if (method.getExclusive() && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ catch(QueueExistsException qe)
{
+ queue = qe.getExistingQueue();
+ if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ {
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
exception(session, method, errorCode, description);
-
- return;
+ }
+ }
+ catch (AMQException e)
+ {
+ exception(session, method, e, "Cannot declare queue '" + queueName);
}
}
}
@@ -1354,20 +1328,6 @@ public class ServerSessionDelegate extends SessionDelegate
return false;
}
- protected AMQQueue createQueue(final String queueName,
- final QueueDeclare body,
- VirtualHost virtualHost,
- final ServerSession session)
- throws AMQException
- {
- String owner = body.getExclusive() ? session.getClientID() : null;
-
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner,
- body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
-
- return queue;
- }
-
@Override
public void queueDelete(Session session, QueueDelete method)
{
@@ -1412,12 +1372,7 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- queue.delete();
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- DurableConfigurationStoreHelper.removeQueue(store,queue);
- }
+ virtualHost.removeQueue(queue);
}
catch (AMQException e)
{
@@ -1471,7 +1426,14 @@ public class ServerSessionDelegate extends SessionDelegate
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
result.setAutoDelete(queue.isAutoDelete());
- result.setArguments(queue.getArguments());
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>();
+ Collection<String> availableAttrs = queue.getAvailableAttributes();
+
+ for(String attrName : availableAttrs)
+ {
+ arguments.put(attrName, queue.getAttribute(attrName));
+ }
+ result.setArguments(QueueArgumentsConverter.convertModelArgsToWire(arguments));
result.setMessageCount(queue.getMessageCount());
result.setSubscriberCount(queue.getConsumerCount());
@@ -1491,7 +1453,7 @@ public class ServerSessionDelegate extends SessionDelegate
if(sub == null)
{
- exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '"+destination+"'");
+ exception(session, sfm, ExecutionErrorCode.NOT_FOUND, "not-found: destination '" + destination + "'");
}
else if(sub.isStopped())
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index c6bceb6ac7..63582702cb 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -21,9 +21,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -33,6 +30,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.message.InboundMessage;
@@ -40,6 +38,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.InboundMessageAdapter;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -65,7 +64,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -169,9 +167,8 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
_queue = queue;
- Map<String, Object> arguments = queue.getArguments();
- _traceExclude = (String) arguments.get("qpid.trace.exclude");
- _trace = (String) arguments.get("qpid.trace.id");
+ _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
+ _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
String filterLogString = null;
_logActor = GenericActor.getInstance(this);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index 6577efe292..4e620327c9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -73,7 +73,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue().intern());
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index 8883422989..5238a41e49 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -74,7 +74,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
else
{
channel.sync();
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueueRegistry().getQueue(body.getQueue());
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
if (queue == null)
{
_log.info("No queue for '" + body.getQueue() + "'");
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
index 85f0a6fd3d..ba5692fc6c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
@@ -70,7 +70,6 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MethodRegistry methodRegistry = session.getMethodRegistry();
final AMQChannel channel = session.getChannel(channelId);
@@ -115,7 +114,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
else
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
if (queue == null)
{
@@ -141,7 +140,7 @@ public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBo
}
else if (queueName != null)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
if (queue == null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
index a8e4e38422..359bd2eb19 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
@@ -62,7 +62,6 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
@@ -73,7 +72,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
final AMQQueue queue;
final AMQShortString routingKey;
- if (body.getQueue() == null)
+ final AMQShortString queueName = body.getQueue();
+
+ if (queueName == null)
{
queue = channel.getDefaultQueue();
@@ -94,13 +95,13 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(queueName.toString());
routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
}
final String exchangeName = body.getExchange() == null ? null : body.getExchange().toString();
final Exchange exch = virtualHost.getExchange(exchangeName);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 9f887d881d..fd547d4bac 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
@@ -44,6 +45,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -61,8 +63,6 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
final AMQSessionModel session = protocolConnection.getChannel(channelId);
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
final AMQShortString queueName;
@@ -87,97 +87,103 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
throw body.getChannelNotFoundException(channelId);
}
- synchronized (queueRegistry)
+ if(body.getPassive())
{
- queue = queueRegistry.getQueue(queueName);
-
- AMQSessionModel owningSession = null;
-
- if (queue != null)
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
{
- owningSession = queue.getExclusiveOwningSession();
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
-
- if (queue == null)
+ else
{
- if (body.getPassive())
+ AMQSessionModel owningSession = queue.getExclusiveOwningSession();
+ if (queue.isExclusive() && !queue.isDurable()
+ && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
{
- String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
}
- else
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setAuthorizationHolder(protocolConnection);
+
+ if (body.getExclusive())
{
- queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
queue.setAuthorizationHolder(protocolConnection);
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments());
- }
- if(body.getAutoDelete())
- {
- queue.setDeleteOnNoConsumers(true);
- }
- queueRegistry.registerQueue(queue);
- if (body.getExclusive())
- {
- queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
- queue.setAuthorizationHolder(protocolConnection);
- if(!body.getDurable())
+ if(!body.getDurable())
+ {
+ final AMQQueue q = queue;
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- final AMQQueue q = queue;
- final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ public void doTask(AMQProtocolSession session) throws AMQException
{
- public void doTask(AMQProtocolSession session) throws AMQException
- {
- q.setExclusiveOwningSession(null);
- }
- };
- protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
- {
- protocolConnection.removeSessionCloseTask(sessionCloseTask);
- }
- });
- }
+ q.setExclusiveOwningSession(null);
+ }
+ };
+ protocolConnection.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ protocolConnection.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
}
}
- }
- else if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
- {
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
- }
- else if(!body.getPassive() && ((queue.isExclusive()) != body.getExclusive()))
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
- + queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
+ catch(QueueExistsException qe)
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
- + "as exclusive queue with same name "
- + "declared on another client ID('"
- + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
- }
- else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: "
- + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
- }
- else if(!body.getPassive() && queue.isDurable() != body.getDurable())
- {
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
- "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: "
- + queue.isDurable() + " requested " + body.getDurable() + ")");
- }
+ queue = qe.getExistingQueue();
+ AMQSessionModel owningSession = queue.getExclusiveOwningSession();
+
+ if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
+ else if(queue.isExclusive() != body.getExclusive())
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
+ + queue.isExclusive() + " requested " + body.getExclusive() + ")");
+ }
+ else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+ + "as exclusive queue with same name "
+ + "declared on another client ID('"
+ + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
+ }
+ else if(queue.isAutoDelete() != body.getAutoDelete())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different auto-delete (was: "
+ + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+ }
+ else if(queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '" + queue.getNameShortString() + "' with different durability (was: "
+ + queue.isDurable() + " requested " + body.getDurable() + ")");
+ }
+ }
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
@@ -204,30 +210,35 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
protected AMQQueue createQueue(final AMQShortString queueName,
QueueDeclareBody body,
- VirtualHost virtualHost,
+ final VirtualHost virtualHost,
final AMQProtocolSession session)
throws AMQException
{
- final QueueRegistry registry = virtualHost.getQueueRegistry();
- String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null;
- Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments());
+ final boolean durable = body.getDurable();
+ final boolean autoDelete = body.getAutoDelete();
+ final boolean exclusive = body.getExclusive();
+
+ String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
+
+ Map<String, Object> arguments =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
String queueNameString = AMQShortString.toString(queueName);
+ final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()),
- queueNameString, body.getDurable(), owner, body.getAutoDelete(),
- body.getExclusive(),virtualHost, arguments);
+ final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
+ exclusive, autoDelete, arguments);
- if (body.getExclusive() && !body.getDurable())
+ if (exclusive && !durable)
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
{
public void doTask(AMQProtocolSession session) throws AMQException
{
- if (registry.getQueue(queueName) == queue)
+ if (virtualHost.getQueue(queueName.toString()) == queue)
{
- queue.delete();
+ virtualHost.removeQueue(queue);
}
}
};
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
index 6f5e0ea992..a39faf2e70 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
@@ -62,7 +62,6 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
@@ -82,7 +81,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
}
if (queue == null)
@@ -112,12 +111,7 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
}
- int purged = queue.delete();
-
- if (queue.isDurable())
- {
- DurableConfigurationStoreHelper.removeQueue(store, queue);
- }
+ int purged = virtualHost.removeQueue(queue);
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
index e925eb7455..ff845d3c16 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueuePurgeHandler.java
@@ -60,7 +60,6 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
{
AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
VirtualHost virtualHost = protocolConnection.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
@@ -84,7 +83,7 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
}
if(queue == null)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
index aad5446cb5..20405b82ab 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
@@ -59,8 +59,6 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
final AMQQueue queue;
final AMQShortString routingKey;
@@ -87,7 +85,7 @@ public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindB
}
else
{
- queue = queueRegistry.getQueue(body.getQueue());
+ queue = virtualHost.getQueue(body.getQueue().toString());
routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index ca67b6f79b..35f24afbce 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -110,10 +110,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(destination instanceof QueueDestination)
{
queue = ((QueueDestination) _destination).getQueue();
- if(queue.getArguments() != null && queue.getArguments().containsKey("topic"))
+
+ if(queue.getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
+
qd = (QueueDestination) destination;
Map<Symbol,Filter> filters = source.getFilter();
@@ -194,19 +196,19 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
name = UUID.randomUUID().toString();
}
- queue = _vhost.getQueueRegistry().getQueue(name);
+ queue = _vhost.getQueue(name);
Exchange exchange = exchangeDestination.getExchange();
if(queue == null)
{
- queue = AMQQueueFactory.createAMQQueueImpl(
+ queue = _vhost.createQueue(
UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
name,
isDurable,
null,
true,
true,
- _vhost,
+ true,
Collections.EMPTY_MAP);
}
else
@@ -309,11 +311,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
public void doTask(Connection_1_0 session)
{
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ if (_vhost.getQueue(queueName) == tempQueue)
{
try
{
- tempQueue.delete();
+ _vhost.removeQueue(tempQueue);
}
catch (AMQException e)
{
@@ -417,7 +419,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
try
{
- queue.delete();
+ queue.getVirtualHost().removeQueue(queue);
}
catch(AMQException e)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ed75a8c165..d3962c779c 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -107,7 +107,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ AMQQueue queue = _vhost.getQueue(addr);
if(queue != null)
{
@@ -256,7 +256,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
else
{
- AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+ AMQQueue queue = _vhost.getQueue(addr);
if(queue != null)
{
@@ -329,14 +329,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
- final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
- queueName,
- false, // durable
- null, // owner
- false, // autodelete
- false, // exclusive
- _vhost,
- properties);
+ final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
+ queueName,
+ false, // durable
+ null, // owner
+ false, // autodelete
+ false, // exclusive
+ false,
+ properties);
@@ -347,11 +347,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
{
public void doTask(Connection_1_0 session)
{
- if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+ if (_vhost.getQueue(queueName) == tempQueue)
{
try
{
- tempQueue.delete();
+ _vhost.removeQueue(tempQueue);
}
catch (AMQException e)
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
index 67ac1bdc7c..2c88f83405 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
@MBeanDescription("This MBean exposes the broker level management features")
public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker
@@ -180,7 +181,8 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
throws IOException, JMException
{
final Map<String, Object> createArgs = processNewQueueArguments(queueName, owner, originalArguments);
- getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l, createArgs);
+ getConfiguredObject().createQueue(queueName, State.ACTIVE, durable, false, LifetimePolicy.PERMANENT, 0l,
+ QueueArgumentsConverter.convertWireArgsToModel(createArgs));
}
@@ -196,11 +198,11 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
if (_moveNonExclusiveQueueOwnerToDescription && owner != null)
{
argumentsCopy = new HashMap<String, Object>(arguments == null ? new HashMap<String, Object>() : arguments);
- if (!argumentsCopy.containsKey(AMQQueueFactory.X_QPID_DESCRIPTION))
+ if (!argumentsCopy.containsKey(QueueArgumentsConverter.X_QPID_DESCRIPTION))
{
- LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + AMQQueueFactory.X_QPID_DESCRIPTION);
+ LOGGER.warn("Non-exclusive owner " + owner + " for new queue " + queueName + " moved to " + QueueArgumentsConverter.X_QPID_DESCRIPTION);
- argumentsCopy.put(AMQQueueFactory.X_QPID_DESCRIPTION, owner);
+ argumentsCopy.put(QueueArgumentsConverter.X_QPID_DESCRIPTION, owner);
}
else
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
index e3fac9f711..4240dd5280 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
public class VirtualHostManagerMBeanTest extends TestCase
{
@@ -79,16 +80,16 @@ public class VirtualHostManagerMBeanTest extends TestCase
{
_virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true);
- Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_OWNER);
+ Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_OWNER);
verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
}
public void testCreateQueueWithOwnerAndDescriptionDiscardsOwner() throws Exception
{
- Map<String, Object> arguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
+ Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
_virtualHostManagerMBean.createNewQueue(TEST_QUEUE_NAME, TEST_OWNER, true, arguments);
- Map<String, Object> expectedArguments = Collections.singletonMap(AMQQueueFactory.X_QPID_DESCRIPTION, (Object)TEST_DESCRIPTION);
+ Map<String, Object> expectedArguments = Collections.singletonMap(Queue.DESCRIPTION, (Object)TEST_DESCRIPTION);
verify(_mockVirtualHost).createQueue(TEST_QUEUE_NAME, State.ACTIVE, true, false, LifetimePolicy.PERMANENT, 0, expectedArguments);
}