summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2013-05-15 13:49:25 +0000
committerRobert Gemmell <robbie@apache.org>2013-05-15 13:49:25 +0000
commit554199a88c49bedbb19b10a531bba5d34502aeb9 (patch)
treea98506799457d171f346f64a4dccaee862b28b18 /java/broker
parent7c3d0bd28828694c8b5d905a4418808ee2d89b9a (diff)
downloadqpid-python-554199a88c49bedbb19b10a531bba5d34502aeb9.tar.gz
QPID-4847: add support for message group attributes when creating/inspecting queues via the rest interface and web management ui
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1482838 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/Queue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java18
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java17
5 files changed, 34 insertions, 19 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
index e168e6ea98..0b31f5b81a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
@@ -50,6 +50,8 @@ public class BrokerProperties
public static final String PROPERTY_MANAGEMENT_RIGHTS_INFER_ALL_ACCESS = "qpid.broker_jmx_method_rights_infer_all_access";
public static final String PROPERTY_USE_CUSTOM_RMI_SOCKET_FACTORY = "qpid.broker_jmx_use_custom_rmi_socket_factory";
+ public static final String PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP = "qpid.broker_default-shared-message-group";
+
public static final String PROPERTY_QPID_HOME = "QPID_HOME";
public static final String PROPERTY_QPID_WORK = "QPID_WORK";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
index bf703e6fbe..6fe0607ab2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -88,7 +88,6 @@ public interface Queue extends ConfiguredObject
public static final String ALTERNATE_EXCHANGE = "alternateExchange";
public static final String EXCLUSIVE = "exclusive";
public static final String MESSAGE_GROUP_KEY = "messageGroupKey";
- public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
public static final String LVQ_KEY = "lvqKey";
public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
@@ -122,7 +121,6 @@ public interface Queue extends ConfiguredObject
LVQ_KEY,
SORT_KEY,
MESSAGE_GROUP_KEY,
- MESSAGE_GROUP_DEFAULT_GROUP,
MESSAGE_GROUP_SHARED_GROUPS,
MAXIMUM_DELIVERY_ATTEMPTS,
QUEUE_FLOW_CONTROL_SIZE_BYTES,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 0916c4e730..afcce482b6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -308,10 +308,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
// TODO
}
- else if(MESSAGE_GROUP_DEFAULT_GROUP.equals(name))
- {
- // TODO
- }
else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
// TODO
@@ -416,15 +412,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
- // TODO
- }
- else if(MESSAGE_GROUP_DEFAULT_GROUP.equals(name))
- {
- // TODO
+ return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY);
}
else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
- // TODO
+ //We only return the boolean value if message groups are actually in use
+ return getAttribute(MESSAGE_GROUP_KEY) == null ? null :
+ SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP));
}
else if(LVQ_KEY.equals(name))
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index b5c34a9f8b..2fa7899572 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -70,6 +70,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -338,6 +339,23 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
throw new IllegalArgumentException("Sort key is not specified for sorted queue");
}
}
+
+ if (attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
+ {
+ String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes);
+ attributes.remove(Queue.MESSAGE_GROUP_KEY);
+ attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key);
+ }
+
+ if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS))
+ {
+ if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes))
+ {
+ attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS);
+ attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ }
+ }
+
String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes, null);
State state = MapValueConverter.getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 980145a83e..b0ab93162a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -41,8 +41,8 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -65,12 +65,15 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
+
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
- private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
- private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
+ public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
+ private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
private static final String QPID_NO_GROUP = "qpid.no-group";
+ private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
@@ -254,12 +257,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
- if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+ if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE))
{
- Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP);
+ Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG);
_messageGroupManager =
new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
- defaultGroup == null ? QPID_NO_GROUP : defaultGroup.toString(),
+ defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else