diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 278 |
1 files changed, 206 insertions, 72 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b14b92b014..d781dc4dea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1,11 +1,10 @@ package org.apache.qpid.server.queue; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +34,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.AMQChannel; /* * @@ -96,6 +96,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private final Executor _asyncDelivery; private final AtomicLong _totalMessagesReceived = new AtomicLong(); + private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>(); + /** max allowed size(KB) of a single message */ public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize(); @@ -122,6 +124,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private LogSubject _logSubject; private LogActor _logActor; + + private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity(); + private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity(); + private final AtomicBoolean _overfull = new AtomicBoolean(false); + protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { @@ -508,8 +515,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity()); + } sub.send(entry); - } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) @@ -626,6 +636,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throw new FailedDequeueException(_name.toString(), e); } + checkCapacity(); + } private void decrementQueueSize(final QueueEntry entry) @@ -1170,11 +1182,64 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - public void deliverAsync() + public void checkCapacity(AMQChannel channel) { - _stateChangeCount.incrementAndGet(); + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + _overfull.set(true); + //Overfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity)); + + if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null) + { + channel.block(this); + } + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) + { + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity)); + } - Runner runner = new Runner(); + + for(AMQChannel c : _blockedChannels.keySet()) + { + c.unblock(this); + _blockedChannels.remove(c); + } + } + } + } + + + public void deliverAsync() + { + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1187,13 +1252,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } + private class Runner implements ReadWriteRunnable { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + public void run() { + String originalName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(_name); CurrentActor.set(_logActor); + processQueue(this); } catch (AMQException e) @@ -1203,9 +1278,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener finally { CurrentActor.remove(); + Thread.currentThread().setName(originalName); } - - } public boolean isRead() @@ -1217,6 +1291,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return true; } + + public String toString() + { + return _name; + } } private class SubFlushRunner implements ReadWriteRunnable @@ -1230,27 +1309,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void run() { - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - } - catch (AMQException e) - { - _logger.error(e); + String originalName = Thread.currentThread().getName(); + try{ + Thread.currentThread().setName("SubFlushRunner-"+_sub); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } } finally { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); + Thread.currentThread().setName(originalName); } - } public boolean isRead() @@ -1278,7 +1366,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { sub.getSendLock(); - atTail = attemptDelivery(sub); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1308,63 +1396,81 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } + /** + * Attempt delivery for the given subscription. + * + * Looks up the next node for the subscription and attempts to deliver it. + * + * @param sub + * @return true if we have completed all possible deliveries for this sub. + * @throws AMQException + */ private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; - boolean subActive = sub.isActive(); + boolean subActive = sub.isActive() && !sub.isSuspended(); if (subActive) { QueueEntry node = moveSubscriptionToNextNode(sub); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity()); + } if (!(node.isAcquired() || node.isDeleted())) { - if (!sub.isSuspended()) + if (sub.hasInterest(node)) { - if (sub.hasInterest(node)) + if (!sub.wouldSuspend(node)) { - if (!sub.wouldSuspend(node)) + if (!sub.isBrowser() && !node.acquire(sub)) { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - } - else + sub.restoreCredit(node); + } + else + { + deliverMessage(sub, node); + + if (sub.isBrowser()) { - deliverMessage(sub, node); + QueueEntry newNode = _entries.next(node); - if (sub.isBrowser()) + if (newNode != null) { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); } + } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); } + } - else + else // Not enough Credit for message and wouldSuspend { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + + // 2009-09-30 : MR : setting subActive = false only causes, this + // particular delivery attempt to end. This is called from + // flushSubscription and processQueue both of which attempt + // delivery a number of times. Won't a bytes limited + // subscriber with not enough credit for the next message + // create a lot of new QELs? How about a browser that calls + // this method LONG.MAX_LONG times! + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); } } - } atTail = (_entries.next(node) == null) && !advanced; } @@ -1409,6 +1515,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity())); + } + return node; } @@ -1423,6 +1535,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.compareAndSet(runner, null); + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move @@ -1442,20 +1559,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { - boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); try { - if (sub != null) - { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } - } + done = attemptDelivery(sub); if (done) { if (extraLoops == 0) @@ -1492,11 +1600,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } _asyncDelivery.execute(runner); } } - @Override public void checkMessageStatus() throws AMQException { @@ -1604,6 +1715,27 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + } + + public Set<NotificationCheck> getNotificationChecks() { return _notificationChecks; @@ -1673,6 +1805,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setMaximumMessageSize(config.getMaximumMessageSize()); setMaximumMessageCount(config.getMaximumMessageCount()); setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); + _capacity = config.getCapacity(); + _flowResumeCapacity = config.getFlowResumeCapacity(); } } } |