diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-18 01:02:27 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-18 01:02:27 +0000 |
| commit | 01cc230ddd9d630c99bd27140297e31213821ae6 (patch) | |
| tree | 017be4b05c032712490ae1f797204422db990e82 /qpid/java/broker-core | |
| parent | 2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (diff) | |
| download | qpid-python-01cc230ddd9d630c99bd27140297e31213821ae6.tar.gz | |
QPID-6396 : [Java Broker] Allow queues to enforce all consumers to be non-destructive
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660553 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
3 files changed, 25 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index b69c17dca0..9c6442e7c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -51,6 +51,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; String DEFAULT_FILTERS = "defaultFilters"; + String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers"; String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) @@ -70,6 +71,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); + @ManagedAttribute( defaultValue = "false" ) + boolean isEnsureNondestructiveConsumers(); + @DerivedAttribute( persist = true ) String getOwner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 02798e9834..41a95074c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -247,6 +247,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private long _minimumMessageTtl; @ManagedAttributeField private long _maximumMessageTtl; + @ManagedAttributeField + private boolean _ensureNondestructiveConsumers; private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); @@ -620,6 +622,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public boolean isEnsureNondestructiveConsumers() + { + return _ensureNondestructiveConsumers; + } + + + + @Override public Collection<String> getAvailableAttributes() { return new ArrayList<String>(_arguments.keySet()); @@ -760,6 +770,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } } + + if(_ensureNondestructiveConsumers) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); + } + QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index ae1fe12c92..367a12057d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -64,12 +64,14 @@ public class QueueArgumentsConverter public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters"; - + public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers"; /** * No-local queue argument is used to support the no-local feature of Durable Subscribers. */ public static final String QPID_NO_LOCAL = "no-local"; + static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>(); + static { ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); @@ -103,6 +105,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS); + ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS); } |
