From f471422a7b736bbd074649ec1fa8e8e036978abc Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 26 Oct 2012 08:44:43 +0000 Subject: QPID-4395 : Fix client ack in onMessage in JMS AMQP 1.0 client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1402430 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java | 4 ++-- .../java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) (limited to 'java') diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index e0402cd0a7..eb34cead08 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -316,9 +316,9 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi _lastUnackedMessage = deliveryTag; } - void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException + void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) { - final int acknowledgeMode = _session.getAcknowledgeMode(); + int acknowledgeMode = _session.getAckModeEnum().ordinal(); if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index d14aeedf23..cedf9a180a 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -804,6 +804,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession if(message != null) { + if(_acknowledgeMode == AcknowledgeMode.CLIENT_ACKNOWLEDGE) + { + consumer.setLastUnackedMessage(msg.getDeliveryTag()); + } _currentConsumer = consumer; _currentMessage = msg; try @@ -816,11 +820,11 @@ public class SessionImpl implements Session, QueueSession, TopicSession _currentMessage = null; } - if((_recoveredMessage == null) && (_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE - || _acknowledgeMode == AcknowledgeMode.DUPS_OK_ACKNOWLEDGE)) + if(_recoveredMessage == null) { - consumer.acknowledge(msg); + consumer.preReceiveAction(msg); } + } } -- cgit v1.2.1