summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-03 09:32:22 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-03 09:32:22 +0000
commitff3aa7b61a1b7454e15958ae03a7f09e687dd754 (patch)
tree01c7a6fa2831abc6a88cb3a7bc70ca6bb1870e12 /java/client/src/main
parent87cf93aba115c215cd0ca16e9f944e96d43a863b (diff)
downloadqpid-python-ff3aa7b61a1b7454e15958ae03a7f09e687dd754.tar.gz
QPID-617 : Transactional consume does not ack messages.
Problem is that we were not classing msgs as consumed until onMessage completed in the transactional case. This patch corrects that. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java34
1 files changed, 19 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 014fd36414..4ac77f1374 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -250,12 +250,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ private void preApplicationProcessing(AbstractJMSMessage msg) throws JMSException
{
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ switch (_acknowledgeMode)
{
- _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+
+ case Session.CLIENT_ACKNOWLEDGE:
+ _unacknowledgedDeliveryTags.add(msg.getDeliveryTag());
+ break;
+
+ case Session.SESSION_TRANSACTED:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _logger.info("Recording tag for commit:" + msg.getDeliveryTag());
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
+ }
+
+ break;
}
_session.setInRecovery(false);
@@ -714,18 +730,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
break;
-
- case Session.SESSION_TRANSACTED:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- else
- {
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- }
-
- break;
}
}