diff options
Diffstat (limited to 'qpid/java')
4 files changed, 118 insertions, 72 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b14b92b014..9894efed20 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -508,8 +508,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener throws AMQException { _deliveredMessages.incrementAndGet(); + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity()); + } sub.send(entry); - } private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) @@ -1172,9 +1175,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - _stateChangeCount.incrementAndGet(); - - Runner runner = new Runner(); + Runner runner = new Runner(_stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1187,13 +1188,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(new SubFlushRunner(sub)); } + private class Runner implements ReadWriteRunnable { + String _name; + public Runner(long count) + { + _name = "QueueRunner-" + count + "-" + _logActor; + } + public void run() { + String originalName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(_name); CurrentActor.set(_logActor); + processQueue(this); } catch (AMQException e) @@ -1203,9 +1214,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener finally { CurrentActor.remove(); + Thread.currentThread().setName(originalName); } - - } public boolean isRead() @@ -1217,6 +1227,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return true; } + + public String toString() + { + return _name; + } } private class SubFlushRunner implements ReadWriteRunnable @@ -1230,27 +1245,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void run() { - boolean complete = false; - try - { - CurrentActor.set(_sub.getLogActor()); - complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); - } - catch (AMQException e) - { - _logger.error(e); + String originalName = Thread.currentThread().getName(); + try{ + Thread.currentThread().setName("SubFlushRunner-"+_sub); + + boolean complete = false; + try + { + CurrentActor.set(_sub.getLogActor()); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); + + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + } + if (!complete && !_sub.isSuspended()) + { + _asyncDelivery.execute(this); + } } finally { - CurrentActor.remove(); - } - if (!complete && !_sub.isSuspended()) - { - _asyncDelivery.execute(this); + Thread.currentThread().setName(originalName); } - } public boolean isRead() @@ -1278,7 +1302,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { sub.getSendLock(); - atTail = attemptDelivery(sub); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1308,63 +1332,78 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } + /** + * Attempt delivery for the given subscription. + * + * Looks up the next node for the subscription and attempts to deliver it. + * + * @param sub + * @return + * @throws AMQException + */ private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; - boolean subActive = sub.isActive(); + boolean subActive = sub.isActive() && !sub.isSuspended(); if (subActive) { QueueEntry node = moveSubscriptionToNextNode(sub); + _logger.debug(sub + ": attempt delivery: " + node.debugIdentity()); if (!(node.isAcquired() || node.isDeleted())) { - if (!sub.isSuspended()) + if (sub.hasInterest(node)) { - if (sub.hasInterest(node)) + if (!sub.wouldSuspend(node)) { - if (!sub.wouldSuspend(node)) + if (!sub.isBrowser() && !node.acquire(sub)) { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - } - else + sub.restoreCredit(node); + } + else + { + deliverMessage(sub, node); + + if (sub.isBrowser()) { - deliverMessage(sub, node); + QueueEntry newNode = _entries.next(node); - if (sub.isBrowser()) + if (newNode != null) { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); } + } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub, node)); } + } - else + else // Not enough Credit for message and wouldSuspend { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + + // 2009-09-30 : MR : setting subActive = false only causes, this + // particular delivery attempt to end. This is called from + // flushSubscription and processQueue both of which attempt + // delivery a number of times. Won't a bytes limited + // subscriber with not enough credit for the next message + // create a lot of new QELs? How about a browser that calls + // this method LONG.MAX_LONG times! + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); } } - } atTail = (_entries.next(node) == null) && !advanced; } @@ -1409,6 +1448,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + + if (_logger.isDebugEnabled()) + { + _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity())); + } + return node; } @@ -1423,6 +1468,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asynchronousRunner.compareAndSet(runner, null); + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move @@ -1442,20 +1492,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //iterate over the subscribers and try to advance their pointer while (subscriptionIter.advance()) { - boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); sub.getSendLock(); try { - if (sub != null) - { - - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null) - { - done = attemptDelivery(sub); - } - } + done = attemptDelivery(sub); if (done) { if (extraLoops == 0) @@ -1492,11 +1533,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } _asyncDelivery.execute(runner); } } - @Override public void checkMessageStatus() throws AMQException { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java index 2efe93eed8..ff766c907d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java @@ -144,6 +144,7 @@ public class RollbackOrderTest extends QpidTestCase } catch (JMSException e) { + System.out.println("Error:" + e.getMessage()); exceptions[(int)count.getCount()] = e; } catch (AssertionFailedError cf) @@ -154,6 +155,7 @@ public class RollbackOrderTest extends QpidTestCase count.countDown(); } + System.out.println("Error:" + cf.getMessage()); System.err.println(cf.getMessage()); cf.printStackTrace(); failed.set(true); @@ -175,11 +177,15 @@ public class RollbackOrderTest extends QpidTestCase } } +// _consumer.close(); + _connection.close(); + assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get()); } @Override public void tearDown() throws Exception { + drainQueue(_queue); super.tearDown(); diff --git a/qpid/java/test-profiles/08Excludes b/qpid/java/test-profiles/08Excludes index b277c6d929..6f3898384d 100644 --- a/qpid/java/test-profiles/08Excludes +++ b/qpid/java/test-profiles/08Excludes @@ -14,8 +14,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* // QPID-1823: this takes ages to run org.apache.qpid.client.SessionCreateTest#* -org.apache.qpid.test.client.RollbackOrderTest#* - // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable org.apache.qpid.management.jmx.ManagementActorLoggingTest#* org.apache.qpid.server.queue.ModelTest#* diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes index de876e06bb..ee781fb80f 100644 --- a/qpid/java/test-profiles/08StandaloneExcludes +++ b/qpid/java/test-profiles/08StandaloneExcludes @@ -39,8 +39,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* // QPID-1823: this takes ages to run org.apache.qpid.client.SessionCreateTest#* -org.apache.qpid.test.client.RollbackOrderTest#* - // This test requires the standard configuration file for validation. // Excluding here does not reduce test coverage. org.apache.qpid.server.configuration.ServerConfigurationFileTest#* |
