summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-28 20:43:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-28 20:43:20 +0000
commit6f7106e1ab47566a82cf6373b25dff1dc345fb25 (patch)
tree6f822e0160f7d681503985228908f3e99397d5bb /qpid/java/broker-core/src
parentff054df6bad7466b7aacd5cdabe162397b25c88a (diff)
downloadqpid-python-6f7106e1ab47566a82cf6373b25dff1dc345fb25.tar.gz
QPID-5934 : [Java Broker] Allow TTL to be overridden on a per-Queue basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1614166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java52
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java122
5 files changed, 178 insertions, 13 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index db53f840de..e98f6cd19b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -46,6 +46,8 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
+ String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
+ String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
@ManagedAttribute
Exchange getAlternateExchange();
@@ -135,16 +137,17 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "DEFAULT" )
MessageDurability getMessageDurability();
+ @ManagedAttribute
+ long getMinimumMessageTtl();
+ @ManagedAttribute
+ long getMaximumMessageTtl();
//children
Collection<? extends Binding> getBindings();
- // TODO - Undo this commented out line when we stop supporting 1.6 for compilation
- // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that
- // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue
- // Collection<? extends Consumer> getConsumers();
+ Collection<? extends Consumer> getConsumers();
//operations
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2aeca6f45f..11d5cc733f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -230,6 +230,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private String _messageGroupDefaultGroup;
@ManagedAttributeField
private int _maximumDistinctGroups;
+ @ManagedAttributeField
+ private long _minimumMessageTtl;
+ @ManagedAttributeField
+ private long _maximumMessageTtl;
+
private State _state = State.UNINITIALIZED;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
@@ -547,6 +552,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public long getMinimumMessageTtl()
+ {
+ return _minimumMessageTtl;
+ }
+
+ @Override
+ public long getMaximumMessageTtl()
+ {
+ return _maximumMessageTtl;
+ }
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -967,6 +984,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
+ updateExpiration(entry);
try
{
@@ -1011,6 +1029,40 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
+ private void updateExpiration(final QueueEntry entry)
+ {
+ long expiration = entry.getMessage().getExpiration();
+ long arrivalTime = entry.getMessage().getArrivalTime();
+ if(_minimumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ if(expiration != 0l)
+ {
+ long calculatedExpiration = arrivalTime+_minimumMessageTtl;
+ if(calculatedExpiration > expiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ expiration = calculatedExpiration;
+ }
+ }
+ }
+ if(_maximumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ long calculatedExpiration = arrivalTime+_maximumMessageTtl;
+ if(expiration == 0l || expiration > calculatedExpiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ }
+ }
+ }
+
/**
* iterate over consumers and if any is at the end of the queue and can deliver this message,
* then deliver the message
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index d984cf8ab4..3ddcb98b53 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -36,4 +36,5 @@ public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
QueueEntry getNextValidEntry();
+ void setExpiration(long calculatedExpiration);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index e6cde6c934..49644f8d76 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -128,6 +128,11 @@ public abstract class QueueEntryImpl implements QueueEntry
}
}
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
public InstanceProperties getInstanceProperties()
{
return new EntryInstanceProperties();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index f20285660a..4cbc9ae57b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -21,32 +21,41 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Matchers.contains;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -859,6 +868,101 @@ abstract class AbstractQueueTestBase extends QpidTestCase
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
+
+ public void testMaximumMessageTtl() throws Exception
+ {
+
+ // Test scenarios where only the maximum TTL has been set
+
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl");
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l);
+
+ AMQQueue queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has been incorrectly overriden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long tooLateExpiration = System.currentTimeMillis() + 20000l;
+
+ assertTrue("TTL has not been overriden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ long acceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+ // Test the scenarios where only the minimum TTL has been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has been overriden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long unacceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertTrue("TTL has not been overriden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ acceptableExpiration = System.currentTimeMillis() + 20000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+
+ // Test the scenarios where both the minimum and maximum TTL have been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideBothTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 70000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+
+
+ }
+
+ private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
+ {
+ final List<QueueEntry> entries = new ArrayList<>();
+
+ ServerMessage message = createMessage(1l);
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(message.getExpiration()).thenReturn(expiration);
+ queue.enqueue(message,null);
+ queue.visit(new QueueEntryVisitor()
+ {
+ @Override
+ public boolean visit(final QueueEntry entry)
+ {
+ entries.add(entry);
+ return true;
+ }
+ });
+ assertEquals("Expected only one entry in the queue", 1, entries.size());
+
+ Long entryExpiration =
+ (Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
+
+ queue.clearQueue();
+ entries.clear();
+ return entryExpiration;
+ }
+
/**
* A helper method to put given number of messages into queue
* <p>