summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-05-11 09:55:41 +0000
committerKeith Wall <kwall@apache.org>2012-05-11 09:55:41 +0000
commitbece1e05e7264e85168b0e564623792904d322d6 (patch)
tree95e69122feff11627adea8c78c1a55e1bcf5fae2 /qpid/java/broker/src
parent2097a70e0e26af15de393a1129fd3ebeccd7b96b (diff)
downloadqpid-python-bece1e05e7264e85168b0e564623792904d322d6.tar.gz
QPID-3979: [Java Broker] Conflation queues
Fix bug with checking of head() within the loop that would have prevented a queue entry from being conflated. Refactor code introducing more special Queue Entry to represent the corner cases. Added systest that exercises a conflation queue with multiple producers/single consumer. Applied patch by Phil Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1337094 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java76
1 files changed, 52 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 0b95b9cc47..1e3bb3c50b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -37,6 +37,9 @@ public class ConflationQueueList extends SimpleQueueEntryList
private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
+ private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+
public ConflationQueueList(AMQQueue queue, String conflationKey)
{
super(queue);
@@ -54,60 +57,85 @@ public class ConflationQueueList extends SimpleQueueEntryList
return new ConflationQueueEntry(this, message);
}
+ /**
+ * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
+ */
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
{
- final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry);
- AtomicReference<QueueEntry> latestValueReference = null;
- QueueEntry oldEntry;
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
+ AtomicReference<QueueEntry> entryReferenceFromMap = null;
+ QueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
- // entry, or the current entry has replaced it in the reference. Note that the head represents a special value
+ // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
+ boolean keepTryingToUpdateEntryReference = true;
do
{
- latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry);
- oldEntry = latestValueReference == null ? null : latestValueReference.get();
+ do
+ {
+ entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
+
+ // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)
+ entryFromMap = entryReferenceFromMap.get();
+ }
+ while(entryFromMap == _deleteInProgress);
+
+ boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
+
+ keepTryingToUpdateEntryReference = entryFromMapIsOlder
+ && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
}
- while(oldEntry != null
- && oldEntry.compareTo(entry) < 0
- && oldEntry != getHead()
- && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(keepTryingToUpdateEntryReference);
- if (oldEntry == null)
+ if (entryFromMap == _newerEntryAlreadyBeenAndGone)
{
- // Unlikely: A newer entry came along and was consumed (and entry removed from map)
- // during our processing of getOrPutIfAbsent(). In this case we know our entry has been superseded.
- discardEntry(entry);
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) > 0)
+ else if (entryFromMap.compareTo(addedEntry) > 0)
{
// A newer entry came along
- discardEntry(entry);
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) < 0)
+ else if (entryFromMap.compareTo(addedEntry) < 0)
{
// We replaced some other entry to become the newest value
- discardEntry(oldEntry);
+ discardEntry(entryFromMap);
}
- entry.setLatestValueReference(latestValueReference);
+ addedEntry.setLatestValueReference(entryReferenceFromMap);
}
- return entry;
+ return addedEntry;
}
- private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue)
+ /**
+ * Returns:
+ *
+ * <ul>
+ * <li>the existing entry reference if the value already exists in the map, or</li>
+ * <li>referenceToValue if none exists, or</li>
+ * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
+ * adds and removes during execution of this method.</li>
+ * </ul>
+ */
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
{
- AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue);
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+
if(latestValueReference == null)
{
latestValueReference = _latestValuesMap.get(key);
+ if(latestValueReference == null)
+ {
+ return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ }
}
return latestValueReference;
}
@@ -158,7 +186,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
if(super.delete())
{
- if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead()))
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
Object key = getMessageHeader().getHeader(_conflationKey);
_latestValuesMap.remove(key,_latestValueReference);