diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-05-12 14:55:59 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-05-12 14:55:59 +0000 |
| commit | 666dd961a4b47ad5e608f14c7477c50ef64196d8 (patch) | |
| tree | ff269534e12e726d7e52a105aa0a4ef729b64034 | |
| parent | 2ce5c11a417445ce65cbdf2b74c6451b13dd61b6 (diff) | |
| download | qpid-python-666dd961a4b47ad5e608f14c7477c50ef64196d8.tar.gz | |
QPID-2567 : Update the scavenge code based on feedback from Andrew Kennedy and Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@943534 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java | 21 | ||||
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java | 90 |
2 files changed, 16 insertions, 95 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 04ffefe93d..7332e66f99 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -391,25 +391,8 @@ public class QueueEntryImpl implements QueueEntry if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { - // - // We can't advanceHead() before the 'if' as we need to check if we - // are at the head or not. If we are not at the head then we should - // try scavenge the list. - // - // An alternative would be to provide a remove(this) method on - // the queueEntryList, this would however be less efficient as it - // would require that we walk the queue to locate this before we - // could perform the removal. - // - if (((QueueEntryImpl) _queueEntryList.getHead()).nextNode() != this) - { - _queueEntryList.advanceHead(); - _queueEntryList.scavenge(); - } - else - { - _queueEntryList.advanceHead(); - } + + _queueEntryList.advanceHead(); return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 454d9616dd..d22f50772b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -46,7 +46,6 @@ public class SimpleQueueEntryList implements QueueEntryList (QueueEntryImpl.class, QueueEntryImpl.class, "_next"); private AtomicLong _scavenges = new AtomicLong(0L); - private AtomicReference<Thread> _scavenger = new AtomicReference<Thread>(null); private static final long SCAVENGE_COUNT = Integer.getInteger("qpid.queue.scavenge_count", 50); @@ -61,90 +60,29 @@ public class SimpleQueueEntryList implements QueueEntryList void advanceHead() { - QueueEntryImpl head = _head.nextNode(); - while(head._next != null && head.isDeleted()) - { + QueueEntryImpl next = _head.nextNode(); + QueueEntryImpl newNext = _head.getNext(); - final QueueEntryImpl newhead = head.nextNode(); - if(newhead != null) - { - _nextUpdater.compareAndSet(_head,head, newhead); - } - head = _head.nextNode(); - } - } - - void scavenge() - { - _scavenges.incrementAndGet(); - - if (_scavenges.get() < SCAVENGE_COUNT) + if (!_nextUpdater.compareAndSet(_head, next, newNext)) { - return; + _scavenges.incrementAndGet(); } - try + if (_scavenges.get() > SCAVENGE_COUNT) { + _scavenges.set(0L); + scavenge(); + } + } - if (_scavenger.compareAndSet(null, Thread.currentThread())) - { - // only delete the number of scavenges requested. - // This should keep things fair when we have multiple consumers - // using selectors that will be calling this. - // With multiple consumers this will also be called but - // advanceHead should take care of it in most instances. - // Often it will be the ExpiredMessageTask so will not - // affect throughput. - long deletesToPerform = _scavenges.getAndSet(0); - - - QueueEntryImpl root = _head; - QueueEntryImpl next = root.nextNode(); - - do - { - - while (next._next != null && next.isDeleted()) - { - - final QueueEntryImpl newhead = next.nextNode(); - if (newhead != null) - { - _nextUpdater.compareAndSet(root, next, newhead); - } - next = root.nextNode(); - } - if (next._next != null) - { - if (!next.isDeleted()) - { - root = next; - next = root.nextNode(); - deletesToPerform--; - } - - // Limit the number of scavenges performed by this - // thread. For fairness. - if (deletesToPerform == 0) - { - return; - } - - } - else - { - break; - } + private void scavenge() + { + QueueEntryImpl next = _head.getNext(); - } - while (next != null && next._next != null); - } - } - finally + while (next != null) { - _scavenger.compareAndSet(Thread.currentThread(), null); + next = next.getNext(); } - } public AMQQueue getQueue() |
