diff options
Diffstat (limited to 'qpid/java/client')
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 25 | ||||
| -rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 4 |
2 files changed, 15 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 47250c0f60..7ce81aeea2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -290,6 +290,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } _session.setInRecovery(false); + preDeliver(jmsMsg); } /** @@ -686,7 +687,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me // if (!_closed.get()) { - preDeliver(jmsMessage); + //preDeliver(jmsMessage); notifyMessage(jmsMessage); } @@ -768,6 +769,17 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me // to send out the appropriate frame msg.setAMQSession(_session); break; + case Session.SESSION_TRANSACTED: + if (isNoConsume()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } + else + { + _session.addDeliveredMessage(msg.getDeliveryTag()); + } + + break; } } @@ -794,17 +806,6 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } break; - case Session.SESSION_TRANSACTED: - if (isNoConsume()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), false); - } - else - { - _session.addDeliveredMessage(msg.getDeliveryTag()); - } - - break; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 6103d92fd8..9230225bd5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -261,11 +261,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - if (!_session.getTransacted()) + super.preApplicationProcessing(jmsMsg); + if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE) { _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag()); } - _session.setInRecovery(false); } @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage( |
