From f7f0bf52f98429935d9a964776c905ce5e54258f Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 6 Oct 2011 16:29:27 +0000 Subject: QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers. Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1179695 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 86 +++++++--------------- .../org/apache/qpid/client/AMQSession_0_10.java | 77 +++++++++---------- .../org/apache/qpid/client/AMQSession_0_8.java | 29 +++++++- .../apache/qpid/client/BasicMessageConsumer.java | 69 +++++------------ .../qpid/client/BasicMessageConsumer_0_10.java | 6 +- .../apache/qpid/client/BasicMessageProducer.java | 10 --- .../qpid/test/unit/message/TestAMQSession.java | 7 +- 7 files changed, 118 insertions(+), 166 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 566c0320e9..a41f2f9b17 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -612,30 +612,27 @@ public abstract class AMQSession 0 ) - { - messageAcknowledge(_txRangeSet, true); - _txRangeSet.clear(); - _txSize = 0; - } - sendCommit(); - } - catch(TransportException e) + if( _txSize > 0 ) { - throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; } - catch (AMQException e) + + getQpidSession().setAutoSync(true); + try { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + getQpidSession().txCommit(); } - catch (FailoverException e) + finally { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + getQpidSession().setAutoSync(false); } + // We need to sync so that we get notify of an error. + sync(); } protected final boolean tagLE(long tag1, long tag2) @@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQSession 0 ) + { + messageAcknowledge(range, true); + getQpidSession().sync(); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3ae6af6350..ccb2b00947 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession extends Closeable implements Messa */ protected final int _acknowledgeMode; - /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue _receivedDeliveryTags = new ConcurrentLinkedQueue(); - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet _previouslyAcked = new TreeSet(); - - private final Object _commitLock = new Object(); - /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } } - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException - { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); - } - - _session.setInRecovery(false); - preDeliver(jmsMsg); - } - /** * @param immediate if true then return immediately if the connection is failing over * @@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } return m; @@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } @@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { if (isMessageListenerSet()) { - preApplicationProcessing(jmsMessage); + preDeliver(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } @@ -758,19 +725,28 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa } } - void preDeliver(AbstractJMSMessage msg) + protected void preDeliver(AbstractJMSMessage msg) { + _session.setInRecovery(false); + switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + _session.addUnacknowledgedMessage(msg.getDeliveryTag()); + _session.markDirty(); + } break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -792,15 +768,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { switch (_acknowledgeMode) { - - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - _session.markDirty(); - break; - case Session.DUPS_OK_ACKNOWLEDGE: case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 47da59724c..d3494298d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer