diff options
| author | Robert Gemmell <robbie@apache.org> | 2013-05-15 13:49:25 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2013-05-15 13:49:25 +0000 |
| commit | 554199a88c49bedbb19b10a531bba5d34502aeb9 (patch) | |
| tree | a98506799457d171f346f64a4dccaee862b28b18 /java/broker | |
| parent | 7c3d0bd28828694c8b5d905a4418808ee2d89b9a (diff) | |
| download | qpid-python-554199a88c49bedbb19b10a531bba5d34502aeb9.tar.gz | |
QPID-4847: add support for message group attributes when creating/inspecting queues via the rest interface and web management ui
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1482838 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
5 files changed, 34 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java index e168e6ea98..0b31f5b81a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java @@ -50,6 +50,8 @@ public class BrokerProperties public static final String PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS = "qpid.broker_jmx_method_rights_infer_all_access"; public static final String PROPERTY_USE_CUSTOM_RMI_SOCKET_FACTORY = "qpid.broker_jmx_use_custom_rmi_socket_factory"; + public static final String PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP = "qpid.broker_default-shared-message-group"; + public static final String PROPERTY_QPID_HOME = "QPID_HOME"; public static final String PROPERTY_QPID_WORK = "QPID_WORK"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java index bf703e6fbe..6fe0607ab2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -88,7 +88,6 @@ public interface Queue extends ConfiguredObject public static final String ALTERNATE_EXCHANGE = "alternateExchange"; public static final String EXCLUSIVE = "exclusive"; public static final String MESSAGE_GROUP_KEY = "messageGroupKey"; - public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; public static final String LVQ_KEY = "lvqKey"; public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; @@ -122,7 +121,6 @@ public interface Queue extends ConfiguredObject LVQ_KEY, SORT_KEY, MESSAGE_GROUP_KEY, - MESSAGE_GROUP_DEFAULT_GROUP, MESSAGE_GROUP_SHARED_GROUPS, MAXIMUM_DELIVERY_ATTEMPTS, QUEUE_FLOW_CONTROL_SIZE_BYTES, diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 0916c4e730..afcce482b6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -308,10 +308,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { // TODO } - else if(MESSAGE_GROUP_DEFAULT_GROUP.equals(name)) - { - // TODO - } else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) { // TODO @@ -416,15 +412,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } else if(MESSAGE_GROUP_KEY.equals(name)) { - // TODO - } - else if(MESSAGE_GROUP_DEFAULT_GROUP.equals(name)) - { - // TODO + return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY); } else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) { - // TODO + //We only return the boolean value if message groups are actually in use + return getAttribute(MESSAGE_GROUP_KEY) == null ? null : + SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); } else if(LVQ_KEY.equals(name)) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index b5c34a9f8b..2fa7899572 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -70,6 +70,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -338,6 +339,23 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new IllegalArgumentException("Sort key is not specified for sorted queue"); } } + + if (attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) + { + String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes); + attributes.remove(Queue.MESSAGE_GROUP_KEY); + attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key); + } + + if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS)) + { + if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes)) + { + attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS); + attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + } + } + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null); State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 980145a83e..b0ab93162a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -41,8 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.plugins.AbstractConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -65,12 +65,15 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper { + private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; - private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; - private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group"; + public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; + public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; + public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; + private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; private static final String QPID_NO_GROUP = "qpid.no-group"; + private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); // TODO - should make this configurable at the vhost / broker level private static final int DEFAULT_MAX_GROUPS = 255; @@ -254,12 +257,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) { - if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1")) + if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE)) { - Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP); + Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG); _messageGroupManager = new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), - defaultGroup == null ? QPID_NO_GROUP : defaultGroup.toString(), + defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else |
