diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:29:27 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-10-06 16:29:27 +0000 |
| commit | f7f0bf52f98429935d9a964776c905ce5e54258f (patch) | |
| tree | 4fdba543694c46346fd1c515b2594e543c5dc263 /qpid/java/client | |
| parent | 67dfbf8458fb1b7e8b54a417d0d831176b6ddbe3 (diff) | |
| download | qpid-python-f7f0bf52f98429935d9a964776c905ce5e54258f.tar.gz | |
QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1179695 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
7 files changed, 118 insertions, 166 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 566c0320e9..a41f2f9b17 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -612,30 +612,27 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { throw new IllegalStateException("Session is already closed"); } - else if (hasFailedOver()) + else if (hasFailedOverDirty()) { + //perform an implicit recover in this scenario + recover(); + + //notify the consumer throw new IllegalStateException("has failed over"); } - while (true) + try { - Long tag = _unacknowledgedMessageTags.poll(); - if (tag == null) - { - break; - } - - try - { - acknowledgeMessage(tag, false); - } - catch (TransportException e) - { - throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); - } + acknowledgeImpl(); + } + catch (TransportException e) + { + throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e); } } + protected abstract void acknowledgeImpl() throws JMSException; + /** * Acknowledge one or many messages. * @@ -844,42 +841,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException { checkTransacted(); - try + //Check that we are clean to commit. + if (_failedOverDirty) { - //Check that we are clean to commit. - if (_failedOverDirty) - { - rollback(); - - throw new TransactionRolledBackException("Connection failover has occured since last send. " + - "Forced rollback"); - } + rollback(); + throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." + + "The session transaction was rolled back."); + } - // Acknowledge all delivered messages - while (true) - { - Long tag = _deliveredMessageTags.poll(); - if (tag == null) - { - break; - } - - acknowledgeMessage(tag, false); - } - // Commits outstanding messages and acknowledgments - sendCommit(); + try + { + commitImpl(); markClean(); } catch (AMQException e) { - throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e); + throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e); } catch (FailoverException e) { @@ -891,8 +874,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - public abstract void sendCommit() throws AMQException, FailoverException; - + protected abstract void commitImpl() throws AMQException, FailoverException, TransportException; public void confirmConsumerCancelled(int consumerTag) { @@ -1580,10 +1562,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic abstract public void sync() throws AMQException; - public int getAcknowledgeMode() throws JMSException + public int getAcknowledgeMode() { - checkNotClosed(); - return _acknowledgeMode; } @@ -1643,10 +1623,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _ticket; } - public boolean getTransacted() throws JMSException + public boolean getTransacted() { - checkNotClosed(); - return _transacted; } @@ -3096,21 +3074,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * @return boolean true if failover has occured. */ - public boolean hasFailedOver() + public boolean hasFailedOverDirty() { return _failedOverDirty; } - /** - * Check to see if any message have been sent in this transaction and have not been commited. - * - * @return boolean true if a message has been sent but not commited - */ - public boolean isDirty() - { - return _dirty; - } - public void setTicket(int ticket) { _ticket = ticket; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 0cca946f19..8f77cc6258 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -412,25 +412,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - */ - public void sendCommit() throws AMQException, FailoverException - { - getQpidSession().setAutoSync(true); - try - { - getQpidSession().txCommit(); - } - finally - { - getQpidSession().setAutoSync(false); - } - // We need to sync so that we get notify of an error. - sync(); - } - /** * Create a queue with a given name. * @@ -463,6 +444,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendRecover() throws AMQException, FailoverException { // release all unacked messages + RangeSet ranges = gatherUnackedRangeSet(); + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + // We need to sync so that we get notify of an error. + sync(); + } + + private RangeSet gatherUnackedRangeSet() + { RangeSet ranges = new RangeSet(); while (true) { @@ -471,11 +460,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { break; } - ranges.add((int) (long) tag); + + ranges.add(tag.intValue()); } - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); - // We need to sync so that we get notify of an error. - sync(); + + return ranges; } @@ -997,32 +986,26 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - @Override - public void commit() throws JMSException + public void commitImpl() throws AMQException, FailoverException, TransportException { - checkTransacted(); - try - { - if( _txSize > 0 ) - { - messageAcknowledge(_txRangeSet, true); - _txRangeSet.clear(); - _txSize = 0; - } - sendCommit(); - } - catch(TransportException e) + if( _txSize > 0 ) { - throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e); + messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; } - catch (AMQException e) + + getQpidSession().setAutoSync(true); + try { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + getQpidSession().txCommit(); } - catch (FailoverException e) + finally { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + getQpidSession().setAutoSync(false); } + // We need to sync so that we get notify of an error. + sync(); } protected final boolean tagLE(long tag1, long tag2) @@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return sb.toString(); } + protected void acknowledgeImpl() + { + RangeSet range = gatherUnackedRangeSet(); + + if(range.size() > 0 ) + { + messageAcknowledge(range, true); + getQpidSession().sync(); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3ae6af6350..ccb2b00947 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.transport.TransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +126,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B return getProtocolHandler().getProtocolVersion(); } + protected void acknowledgeImpl() + { + while (true) + { + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); + } + } + public void acknowledgeMessage(long deliveryTag, boolean multiple) { BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple); @@ -170,8 +185,20 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } } - public void sendCommit() throws AMQException, FailoverException + public void commitImpl() throws AMQException, FailoverException, TransportException { + // Acknowledge all delivered messages + while (true) + { + Long tag = _deliveredMessageTags.poll(); + if (tag == null) + { + break; + } + + acknowledgeMessage(tag, false); + } + final AMQProtocolHandler handler = getProtocolHandler(); handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0dc8c8c3ed..e6e1398a35 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -37,10 +37,7 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.SortedSet; import java.util.ArrayList; -import java.util.Collections; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -118,29 +115,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa protected final int _acknowledgeMode; /** - * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode - */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; - - /** * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>(); - /** The last tag that was "multiple" acknowledged on this session (if transacted) */ - private long _lastAcked; - - /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */ - private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>(); - - private final Object _commitLock = new Object(); - /** * The thread that was used to call receive(). This is important for being able to interrupt that thread if a * receive() is in progress. @@ -290,17 +268,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException - { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); - } - - _session.setInRecovery(false); - preDeliver(jmsMsg); - } - /** * @param immediate if true then return immediately if the connection is failing over * @@ -409,7 +376,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } return m; @@ -482,7 +449,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) { - preApplicationProcessing(m); + preDeliver(m); postDeliver(m); } @@ -734,7 +701,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { if (isMessageListenerSet()) { - preApplicationProcessing(jmsMessage); + preDeliver(jmsMessage); getMessageListener().onMessage(jmsMessage); postDeliver(jmsMessage); } @@ -758,19 +725,28 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } } - void preDeliver(AbstractJMSMessage msg) + protected void preDeliver(AbstractJMSMessage msg) { + _session.setInRecovery(false); + switch (_acknowledgeMode) { - case Session.PRE_ACKNOWLEDGE: _session.acknowledgeMessage(msg.getDeliveryTag(), false); break; - case Session.CLIENT_ACKNOWLEDGE: - // we set the session so that when the user calls acknowledge() it can call the method on session - // to send out the appropriate frame - msg.setAMQSession(_session); + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + msg.setAMQSession(_session); + _session.addUnacknowledgedMessage(msg.getDeliveryTag()); + _session.markDirty(); + } break; case Session.SESSION_TRANSACTED: if (isNoConsume()) @@ -792,15 +768,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { switch (_acknowledgeMode) { - - case Session.CLIENT_ACKNOWLEDGE: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - _session.markDirty(); - break; - case Session.DUPS_OK_ACKNOWLEDGE: case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 47da59724c..d3494298d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super.notifyMessage(messageFrame); } - @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + @Override + protected void preDeliver(AbstractJMSMessage jmsMsg) { - super.preApplicationProcessing(jmsMsg); + super.preDeliver(jmsMsg); + if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE) { _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index e95f9a7414..bf4de782a5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -455,16 +455,6 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac AbstractJMSMessage message = convertToNativeMessage(origMessage); - if (_transacted) - { - if (_session.hasFailedOver() && _session.isDirty()) - { - throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.", - new AMQSessionDirtyException("Failover has occurred and session is dirty " + - "so unable to send.")); - } - } - UUID messageId = null; if (_disableMessageId) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 71a3412dc1..6759b43387 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -64,7 +64,12 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public void sendCommit() throws AMQException, FailoverException + public void commitImpl() throws AMQException, FailoverException + { + + } + + public void acknowledgeImpl() { } |
