summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java180
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java6
-rw-r--r--qpid/java/test-profiles/08Excludes2
-rw-r--r--qpid/java/test-profiles/08StandaloneExcludes2
4 files changed, 118 insertions, 72 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b14b92b014..9894efed20 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -508,8 +508,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
throws AMQException
{
_deliveredMessages.incrementAndGet();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
+ }
sub.send(entry);
-
}
private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -1172,9 +1175,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void deliverAsync()
{
- _stateChangeCount.incrementAndGet();
-
- Runner runner = new Runner();
+ Runner runner = new Runner(_stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1187,13 +1188,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(new SubFlushRunner(sub));
}
+
private class Runner implements ReadWriteRunnable
{
+ String _name;
+ public Runner(long count)
+ {
+ _name = "QueueRunner-" + count + "-" + _logActor;
+ }
+
public void run()
{
+ String originalName = Thread.currentThread().getName();
try
{
+ Thread.currentThread().setName(_name);
CurrentActor.set(_logActor);
+
processQueue(this);
}
catch (AMQException e)
@@ -1203,9 +1214,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
finally
{
CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
}
-
-
}
public boolean isRead()
@@ -1217,6 +1227,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return true;
}
+
+ public String toString()
+ {
+ return _name;
+ }
}
private class SubFlushRunner implements ReadWriteRunnable
@@ -1230,27 +1245,36 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void run()
{
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
- }
- catch (AMQException e)
- {
- _logger.error(e);
+ String originalName = Thread.currentThread().getName();
+ try{
+ Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ _asyncDelivery.execute(this);
+ }
}
finally
{
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
+ Thread.currentThread().setName(originalName);
}
-
}
public boolean isRead()
@@ -1278,7 +1302,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1308,63 +1332,78 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
+ /**
+ * Attempt delivery for the given subscription.
+ *
+ * Looks up the next node for the subscription and attempts to deliver it.
+ *
+ * @param sub
+ * @return
+ * @throws AMQException
+ */
private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
- boolean subActive = sub.isActive();
+ boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
QueueEntry node = moveSubscriptionToNextNode(sub);
+ _logger.debug(sub + ": attempt delivery: " + node.debugIdentity());
if (!(node.isAcquired() || node.isDeleted()))
{
- if (!sub.isSuspended())
+ if (sub.hasInterest(node))
{
- if (sub.hasInterest(node))
+ if (!sub.wouldSuspend(node))
{
- if (!sub.wouldSuspend(node))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
- }
- else
+ sub.restoreCredit(node);
+ }
+ else
+ {
+ deliverMessage(sub, node);
+
+ if (sub.isBrowser())
{
- deliverMessage(sub, node);
+ QueueEntry newNode = _entries.next(node);
- if (sub.isBrowser())
+ if (newNode != null)
{
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
}
+
}
-
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
- node.addStateChangeListener(new QueueEntryListener(sub, node));
}
+
}
- else
+ else // Not enough Credit for message and wouldSuspend
{
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+
+ // 2009-09-30 : MR : setting subActive = false only causes, this
+ // particular delivery attempt to end. This is called from
+ // flushSubscription and processQueue both of which attempt
+ // delivery a number of times. Won't a bytes limited
+ // subscriber with not enough credit for the next message
+ // create a lot of new QELs? How about a browser that calls
+ // this method LONG.MAX_LONG times!
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
}
}
-
}
atTail = (_entries.next(node) == null) && !advanced;
}
@@ -1409,6 +1448,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+ }
+
return node;
}
@@ -1423,6 +1468,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asynchronousRunner.compareAndSet(runner, null);
+ // For every message enqueue/requeue the we fire deliveryAsync() which
+ // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+ // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+ // then we will continue to run for a maximum of iterations.
+ // So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1442,20 +1492,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
//iterate over the subscribers and try to advance their pointer
while (subscriptionIter.advance())
{
- boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
sub.getSendLock();
try
{
- if (sub != null)
- {
-
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null)
- {
- done = attemptDelivery(sub);
- }
- }
+ done = attemptDelivery(sub);
if (done)
{
if (extraLoops == 0)
@@ -1492,11 +1533,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rescheduling runner:" + runner);
+ }
_asyncDelivery.execute(runner);
}
}
- @Override
public void checkMessageStatus() throws AMQException
{
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
index 2efe93eed8..ff766c907d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
@@ -144,6 +144,7 @@ public class RollbackOrderTest extends QpidTestCase
}
catch (JMSException e)
{
+ System.out.println("Error:" + e.getMessage());
exceptions[(int)count.getCount()] = e;
}
catch (AssertionFailedError cf)
@@ -154,6 +155,7 @@ public class RollbackOrderTest extends QpidTestCase
count.countDown();
}
+ System.out.println("Error:" + cf.getMessage());
System.err.println(cf.getMessage());
cf.printStackTrace();
failed.set(true);
@@ -175,11 +177,15 @@ public class RollbackOrderTest extends QpidTestCase
}
}
+// _consumer.close();
+ _connection.close();
+
assertFalse("Exceptions thrown during test run, Check Std.err.", failed.get());
}
@Override public void tearDown() throws Exception
{
+
drainQueue(_queue);
super.tearDown();
diff --git a/qpid/java/test-profiles/08Excludes b/qpid/java/test-profiles/08Excludes
index b277c6d929..6f3898384d 100644
--- a/qpid/java/test-profiles/08Excludes
+++ b/qpid/java/test-profiles/08Excludes
@@ -14,8 +14,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index de876e06bb..ee781fb80f 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -39,8 +39,6 @@ org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
// QPID-1823: this takes ages to run
org.apache.qpid.client.SessionCreateTest#*
-org.apache.qpid.test.client.RollbackOrderTest#*
-
// This test requires the standard configuration file for validation.
// Excluding here does not reduce test coverage.
org.apache.qpid.server.configuration.ServerConfigurationFileTest#*