diff options
| author | Keith Wall <kwall@apache.org> | 2012-05-11 09:55:41 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-05-11 09:55:41 +0000 |
| commit | bece1e05e7264e85168b0e564623792904d322d6 (patch) | |
| tree | 95e69122feff11627adea8c78c1a55e1bcf5fae2 /qpid/java/broker/src | |
| parent | 2097a70e0e26af15de393a1129fd3ebeccd7b96b (diff) | |
| download | qpid-python-bece1e05e7264e85168b0e564623792904d322d6.tar.gz | |
QPID-3979: [Java Broker] Conflation queues
Fix bug with checking of head() within the loop that would have prevented a queue entry from being conflated.
Refactor code introducing more special Queue Entry to represent the corner cases.
Added systest that exercises a conflation queue with multiple producers/single consumer.
Applied patch by Phil Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1337094 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java | 76 |
1 files changed, 52 insertions, 24 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 0b95b9cc47..1e3bb3c50b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -37,6 +37,9 @@ public class ConflationQueueList extends SimpleQueueEntryList private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap = new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>(); + private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this); + private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this); + public ConflationQueueList(AMQQueue queue, String conflationKey) { super(queue); @@ -54,60 +57,85 @@ public class ConflationQueueList extends SimpleQueueEntryList return new ConflationQueueEntry(this, message); } + /** + * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary. + */ @Override public ConflationQueueEntry add(final ServerMessage message) { - final ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message)); + final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message)); final Object keyValue = message.getMessageHeader().getHeader(_conflationKey); if (keyValue != null) { - final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(entry); - AtomicReference<QueueEntry> latestValueReference = null; - QueueEntry oldEntry; + final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry); + AtomicReference<QueueEntry> entryReferenceFromMap = null; + QueueEntry entryFromMap; // Iterate until we have got a valid atomic reference object and either the referent is newer than the current - // entry, or the current entry has replaced it in the reference. Note that the head represents a special value + // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value // indicating that the reference object is no longer valid (it is being removed from the map). + boolean keepTryingToUpdateEntryReference = true; do { - latestValueReference = getOrPutIfAbsent(keyValue, referenceToEntry); - oldEntry = latestValueReference == null ? null : latestValueReference.get(); + do + { + entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry); + + // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value) + entryFromMap = entryReferenceFromMap.get(); + } + while(entryFromMap == _deleteInProgress); + + boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0; + + keepTryingToUpdateEntryReference = entryFromMapIsOlder + && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry); } - while(oldEntry != null - && oldEntry.compareTo(entry) < 0 - && oldEntry != getHead() - && !latestValueReference.compareAndSet(oldEntry, entry)); + while(keepTryingToUpdateEntryReference); - if (oldEntry == null) + if (entryFromMap == _newerEntryAlreadyBeenAndGone) { - // Unlikely: A newer entry came along and was consumed (and entry removed from map) - // during our processing of getOrPutIfAbsent(). In this case we know our entry has been superseded. - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) > 0) + else if (entryFromMap.compareTo(addedEntry) > 0) { // A newer entry came along - discardEntry(entry); + discardEntry(addedEntry); } - else if (oldEntry.compareTo(entry) < 0) + else if (entryFromMap.compareTo(addedEntry) < 0) { // We replaced some other entry to become the newest value - discardEntry(oldEntry); + discardEntry(entryFromMap); } - entry.setLatestValueReference(latestValueReference); + addedEntry.setLatestValueReference(entryReferenceFromMap); } - return entry; + return addedEntry; } - private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToValue) + /** + * Returns: + * + * <ul> + * <li>the existing entry reference if the value already exists in the map, or</li> + * <li>referenceToValue if none exists, or</li> + * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently + * adds and removes during execution of this method.</li> + * </ul> + */ + private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue) { - AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToValue); + AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue); + if(latestValueReference == null) { latestValueReference = _latestValuesMap.get(key); + if(latestValueReference == null) + { + return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone); + } } return latestValueReference; } @@ -158,7 +186,7 @@ public class ConflationQueueList extends SimpleQueueEntryList { if(super.delete()) { - if(_latestValueReference != null && _latestValueReference.compareAndSet(this, getHead())) + if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { Object key = getMessageHeader().getHeader(_conflationKey); _latestValuesMap.remove(key,_latestValueReference); |
