diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 20:43:20 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 20:43:20 +0000 |
| commit | 6f7106e1ab47566a82cf6373b25dff1dc345fb25 (patch) | |
| tree | 6f822e0160f7d681503985228908f3e99397d5bb /qpid/java/broker-core/src | |
| parent | ff054df6bad7466b7aacd5cdabe162397b25c88a (diff) | |
| download | qpid-python-6f7106e1ab47566a82cf6373b25dff1dc345fb25.tar.gz | |
QPID-5934 : [Java Broker] Allow TTL to be overridden on a per-Queue basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
5 files changed, 178 insertions, 13 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index db53f840de..e98f6cd19b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -46,6 +46,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; String QUEUE_FLOW_STOPPED = "queueFlowStopped"; + String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; + String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; @ManagedAttribute Exchange getAlternateExchange(); @@ -135,16 +137,17 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute( defaultValue = "DEFAULT" ) MessageDurability getMessageDurability(); + @ManagedAttribute + long getMinimumMessageTtl(); + @ManagedAttribute + long getMaximumMessageTtl(); //children Collection<? extends Binding> getBindings(); - // TODO - Undo this commented out line when we stop supporting 1.6 for compilation - // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that - // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue - // Collection<? extends Consumer> getConsumers(); + Collection<? extends Consumer> getConsumers(); //operations diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 2aeca6f45f..11d5cc733f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -230,6 +230,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private String _messageGroupDefaultGroup; @ManagedAttributeField private int _maximumDistinctGroups; + @ManagedAttributeField + private long _minimumMessageTtl; + @ManagedAttributeField + private long _maximumMessageTtl; + private State _state = State.UNINITIALIZED; private final AtomicBoolean _recovering = new AtomicBoolean(true); @@ -547,6 +552,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public long getMinimumMessageTtl() + { + return _minimumMessageTtl; + } + + @Override + public long getMaximumMessageTtl() + { + return _maximumMessageTtl; + } + + @Override public Collection<String> getAvailableAttributes() { return new ArrayList<String>(_arguments.keySet()); @@ -967,6 +984,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber; final QueueEntry entry = getEntries().add(message); + updateExpiration(entry); try { @@ -1011,6 +1029,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } + private void updateExpiration(final QueueEntry entry) + { + long expiration = entry.getMessage().getExpiration(); + long arrivalTime = entry.getMessage().getArrivalTime(); + if(_minimumMessageTtl != 0l) + { + if(arrivalTime == 0) + { + arrivalTime = System.currentTimeMillis(); + } + if(expiration != 0l) + { + long calculatedExpiration = arrivalTime+_minimumMessageTtl; + if(calculatedExpiration > expiration) + { + entry.setExpiration(calculatedExpiration); + expiration = calculatedExpiration; + } + } + } + if(_maximumMessageTtl != 0l) + { + if(arrivalTime == 0) + { + arrivalTime = System.currentTimeMillis(); + } + long calculatedExpiration = arrivalTime+_maximumMessageTtl; + if(expiration == 0l || expiration > calculatedExpiration) + { + entry.setExpiration(calculatedExpiration); + } + } + } + /** * iterate over consumers and if any is at the end of the queue and can deliver this message, * then deliver the message diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index d984cf8ab4..3ddcb98b53 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -36,4 +36,5 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> QueueEntry getNextValidEntry(); + void setExpiration(long calculatedExpiration); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e6cde6c934..49644f8d76 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -128,6 +128,11 @@ public abstract class QueueEntryImpl implements QueueEntry } } + public void setExpiration(long expiration) + { + _expiration = expiration; + } + public InstanceProperties getInstanceProperties() { return new EntryInstanceProperties(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index f20285660a..4cbc9ae57b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -21,32 +21,41 @@ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Matchers.contains; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.log4j.Logger; + +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; -import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -859,6 +868,101 @@ abstract class AbstractQueueTestBase extends QpidTestCase verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); } + + public void testMaximumMessageTtl() throws Exception + { + + // Test scenarios where only the maximum TTL has been set + + Map<String,Object> attributes = new HashMap<>(_arguments); + attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl"); + attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l); + + AMQQueue queue = _virtualHost.createQueue(attributes); + + assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 0l)); + + assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l)); + + assertEquals("TTL has been incorrectly overriden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l)); + + long tooLateExpiration = System.currentTimeMillis() + 20000l; + + assertTrue("TTL has not been overriden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration)); + + long acceptableExpiration = System.currentTimeMillis() + 5000l; + + assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration)); + + // Test the scenarios where only the minimum TTL has been set + + attributes = new HashMap<>(_arguments); + attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl"); + attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l); + + queue = _virtualHost.createQueue(attributes); + + assertEquals("TTL has been overriden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l)); + + assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l)); + + assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l)); + + long unacceptableExpiration = System.currentTimeMillis() + 5000l; + + assertTrue("TTL has not been overriden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration)); + + acceptableExpiration = System.currentTimeMillis() + 20000l; + + assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration)); + + + // Test the scenarios where both the minimum and maximum TTL have been set + + attributes = new HashMap<>(_arguments); + attributes.put(Queue.NAME,"testTtlOverrideBothTTl"); + attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l); + attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l); + + queue = _virtualHost.createQueue(attributes); + + assertEquals("TTL has not been overriden", 70000l, getExpirationOnQueue(queue, 50000l, 0l)); + + assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l)); + + assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l)); + + + + } + + private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration) + { + final List<QueueEntry> entries = new ArrayList<>(); + + ServerMessage message = createMessage(1l); + when(message.getArrivalTime()).thenReturn(arrivalTime); + when(message.getExpiration()).thenReturn(expiration); + queue.enqueue(message,null); + queue.visit(new QueueEntryVisitor() + { + @Override + public boolean visit(final QueueEntry entry) + { + entries.add(entry); + return true; + } + }); + assertEquals("Expected only one entry in the queue", 1, entries.size()); + + Long entryExpiration = + (Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION); + + queue.clearQueue(); + entries.clear(); + return entryExpiration; + } + /** * A helper method to put given number of messages into queue * <p> |
