From cccdaa4e8af53d73a7303d07569ed86953441cce Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Tue, 3 Jun 2008 07:01:54 +0000 Subject: QPID-1112: Added sessionCompleted support and changed onMessage for invoking sessionCompleted when all expected messages have been received. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662665 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/client/BasicMessageConsumer.java | 3 +-- .../qpid/client/BasicMessageConsumer_0_10.java | 26 +++++++++++++++++++++- .../java/org/apache/qpidity/nclient/Session.java | 11 +++++++++ .../apache/qpidity/nclient/impl/ClientSession.java | 5 +++++ 4 files changed, 42 insertions(+), 3 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 1741903bb8..c4245d4fc8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -38,7 +38,6 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.SortedSet; import java.util.ArrayList; import java.util.Collections; @@ -116,7 +115,7 @@ public abstract class BasicMessageConsumer extends Closeable implements Me * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private final int _acknowledgeMode; + protected final int _acknowledgeMode; /** * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index f050cbe455..c47aee0410 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -77,6 +77,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer= getSession().getAMQConnection().getMaxPrefetch() ) + { + RangeSet r = new RangeSet(); + r.add(_firstMessageToComplete, message.getMessageTransferId()); + _0_10session.getQpidSession().sessionCompleted(r, Option.TIMELY_REPLY); + _numberReceivedMessages = 0; + } + } + int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); AMQShortString consumerTag = getConsumerTag(); diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index 65b3685f86..28218e01d6 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -65,6 +65,16 @@ public interface Session public void sessionDetach(byte[] name); + /** + * This control is sent by the receiver of commands, and handled by the sender + * of commands. It informs the sender of all commands completed by the receiver. + * This excludes commands known by the receiver to be considered complete at the sender. + * + * @param commands completed commands. + * @param options {@link Option#TIMELY_REPLY} If set, the sender is no longer free to delay the known-completed reply. + */ + public void sessionCompleted(RangeSet commands, Option... options); + public void sessionRequestTimeout(long expiry); public byte[] getName(); @@ -103,6 +113,7 @@ public interface Session public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException; + /** *

This transfer streams a complete message using a single method. * It uses pull-semantics instead of doing a push.

diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index f1701a6b38..58ffffb12b 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -105,6 +105,11 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen _currentDataSizeNotSynced = 0; } + public void sessionCompleted(RangeSet commands, Option ... options) + { + super.sessionCompleted(commands, options); + } + /* ------------------------- * Data methods * ------------------------*/ -- cgit v1.2.1