diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-10-03 09:32:22 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-10-03 09:32:22 +0000 |
| commit | ff3aa7b61a1b7454e15958ae03a7f09e687dd754 (patch) | |
| tree | 01c7a6fa2831abc6a88cb3a7bc70ca6bb1870e12 /java/client/src/main | |
| parent | 87cf93aba115c215cd0ca16e9f944e96d43a863b (diff) | |
| download | qpid-python-ff3aa7b61a1b7454e15958ae03a7f09e687dd754.tar.gz | |
QPID-617 : Transactional consume does not ack messages.
Problem is that we were not classing msgs as consumed until onMessage completed in the transactional case. This patch corrects that.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 014fd36414..4ac77f1374 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -250,12 +250,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + private void preApplicationProcessing(AbstractJMSMessage msg) throws JMSException { - if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + switch (_acknowledgeMode) { - _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); + + case Session.CLIENT_ACKNOWLEDGE: + _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); + break; + + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _logger.info("Recording tag for commit:" + msg.getDeliveryTag()); + _receivedDeliveryTags.add(msg.getDeliveryTag()); + } + + break; } _session.setInRecovery(false); @@ -714,18 +730,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } break; - - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _receivedDeliveryTags.add(msg.getDeliveryTag()); - } - - break; } } |
