diff options
Diffstat (limited to 'qpid/java')
3 files changed, 77 insertions, 6 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 2ae7a17af2..784b75af10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -371,7 +371,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - private volatile boolean _usingDispatcherForCleanup; + protected volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 8dc36673dc..49b77dcc7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { if (suspend) { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - } + synchronized (getMessageDeliveryLock()) + { + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + sync(); + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + + _usingDispatcherForCleanup = true; + syncDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); } else { diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index d2950b095b..5c5ad66777 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -5,12 +5,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -133,4 +135,41 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); } + /** + * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. + * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. + * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. + * Try to receive all 10 messages. + */ + public void testConnectionStop() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); + Connection con = getConnection(); + con.start(); + Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); + + MessageProducer prod = ssn.createProducer(queue); + for (int i=0; i<10;i++) + { + prod.send(ssn.createTextMessage("Msg" + i)); + } + + MessageConsumer consumer = ssn.createConsumer(queue); + // This is to ensure we get the first client to prefetch. + Message msg = consumer.receive(1000); + assertNotNull("The first consumer should get one message",msg); + con.stop(); + + Connection con2 = getConnection(); + con2.start(); + Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = ssn2.createConsumer(queue); + for (int i=0; i<9;i++) + { + TextMessage m = (TextMessage)consumer2.receive(1000); + assertNotNull("The second consumer should get 9 messages, but received only " + i,m); + } + } + } |
