summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-05-12 14:55:59 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-05-12 14:55:59 +0000
commit666dd961a4b47ad5e608f14c7477c50ef64196d8 (patch)
treeff269534e12e726d7e52a105aa0a4ef729b64034
parent2ce5c11a417445ce65cbdf2b74c6451b13dd61b6 (diff)
downloadqpid-python-666dd961a4b47ad5e608f14c7477c50ef64196d8.tar.gz
QPID-2567 : Update the scavenge code based on feedback from Andrew Kennedy and Rob Godfrey.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.5.x-dev@943534 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java21
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java90
2 files changed, 16 insertions, 95 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 04ffefe93d..7332e66f99 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -391,25 +391,8 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- //
- // We can't advanceHead() before the 'if' as we need to check if we
- // are at the head or not. If we are not at the head then we should
- // try scavenge the list.
- //
- // An alternative would be to provide a remove(this) method on
- // the queueEntryList, this would however be less efficient as it
- // would require that we walk the queue to locate this before we
- // could perform the removal.
- //
- if (((QueueEntryImpl) _queueEntryList.getHead()).nextNode() != this)
- {
- _queueEntryList.advanceHead();
- _queueEntryList.scavenge();
- }
- else
- {
- _queueEntryList.advanceHead();
- }
+
+ _queueEntryList.advanceHead();
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 454d9616dd..d22f50772b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -46,7 +46,6 @@ public class SimpleQueueEntryList implements QueueEntryList
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
private AtomicLong _scavenges = new AtomicLong(0L);
- private AtomicReference<Thread> _scavenger = new AtomicReference<Thread>(null);
private static final long SCAVENGE_COUNT = Integer.getInteger("qpid.queue.scavenge_count", 50);
@@ -61,90 +60,29 @@ public class SimpleQueueEntryList implements QueueEntryList
void advanceHead()
{
- QueueEntryImpl head = _head.nextNode();
- while(head._next != null && head.isDeleted())
- {
+ QueueEntryImpl next = _head.nextNode();
+ QueueEntryImpl newNext = _head.getNext();
- final QueueEntryImpl newhead = head.nextNode();
- if(newhead != null)
- {
- _nextUpdater.compareAndSet(_head,head, newhead);
- }
- head = _head.nextNode();
- }
- }
-
- void scavenge()
- {
- _scavenges.incrementAndGet();
-
- if (_scavenges.get() < SCAVENGE_COUNT)
+ if (!_nextUpdater.compareAndSet(_head, next, newNext))
{
- return;
+ _scavenges.incrementAndGet();
}
- try
+ if (_scavenges.get() > SCAVENGE_COUNT)
{
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
- if (_scavenger.compareAndSet(null, Thread.currentThread()))
- {
- // only delete the number of scavenges requested.
- // This should keep things fair when we have multiple consumers
- // using selectors that will be calling this.
- // With multiple consumers this will also be called but
- // advanceHead should take care of it in most instances.
- // Often it will be the ExpiredMessageTask so will not
- // affect throughput.
- long deletesToPerform = _scavenges.getAndSet(0);
-
-
- QueueEntryImpl root = _head;
- QueueEntryImpl next = root.nextNode();
-
- do
- {
-
- while (next._next != null && next.isDeleted())
- {
-
- final QueueEntryImpl newhead = next.nextNode();
- if (newhead != null)
- {
- _nextUpdater.compareAndSet(root, next, newhead);
- }
- next = root.nextNode();
- }
- if (next._next != null)
- {
- if (!next.isDeleted())
- {
- root = next;
- next = root.nextNode();
- deletesToPerform--;
- }
-
- // Limit the number of scavenges performed by this
- // thread. For fairness.
- if (deletesToPerform == 0)
- {
- return;
- }
-
- }
- else
- {
- break;
- }
+ private void scavenge()
+ {
+ QueueEntryImpl next = _head.getNext();
- }
- while (next != null && next._next != null);
- }
- }
- finally
+ while (next != null)
{
- _scavenger.compareAndSet(Thread.currentThread(), null);
+ next = next.getNext();
}
-
}
public AMQQueue getQueue()