From 78fa82c7b48711f06c03e176f6e24b70af65e692 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Fri, 4 Apr 2008 12:02:52 +0000 Subject: QPID-796: Added ability to enable/disable message prefetching. Prefetching is controlled through the property max_prefetch, it is turned off when max_prefetch =0. (this is 0.10 code path change) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@644688 13f79535-47bb-0310-9956-ffa450edef68 --- java/010ExcludeList | 5 +- .../org/apache/qpid/client/AMQSession_0_10.java | 49 +++++++------ .../qpid/client/BasicMessageConsumer_0_10.java | 84 ++++++++++++++++++---- .../org/apache/qpid/client/ClientProperties.java | 36 ++++++++++ java/cpp.async.testprofile | 1 + java/cpp.sync.testprofile | 1 + java/module.xml | 1 + 7 files changed, 141 insertions(+), 36 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpid/client/ClientProperties.java (limited to 'java') diff --git a/java/010ExcludeList b/java/010ExcludeList index 996332afa8..709c846068 100644 --- a/java/010ExcludeList +++ b/java/010ExcludeList @@ -5,5 +5,6 @@ org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber -// this test needs durable subscribe states to be persisted -org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent \ No newline at end of file +// those tests need durable subscribe states to be persisted +org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent +org.apache.qpid.test.unit.ct.DurableSubscriberTests#testDurSubRestoresMessageSelector \ No newline at end of file diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 27feba694c..16d5a07141 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -45,7 +44,6 @@ import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; -import java.util.Iterator; /** * This is a 0.10 Session @@ -58,10 +56,6 @@ public class AMQSession_0_10 extends AMQSession */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); - /** - * The maximum number of pre-fetched messages per destination - */ - public static long MAX_PREFETCH = 1000; /** * The underlying QpidSession @@ -101,8 +95,6 @@ public class AMQSession_0_10 extends AMQSession super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); - MAX_PREFETCH = Integer.parseInt(System.getProperty("max_prefetch","1000")); - // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session @@ -404,18 +396,23 @@ public class AMQSession_0_10 extends AMQSession new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null, consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + if (ClientProperties.MAX_PREFETCH == 0) + { + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + } + else + { + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); + } getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(consumer.isStrated() || _immediatePrefetch) + if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumer.getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - + ClientProperties.MAX_PREFETCH); } getQpidSession().sync(); getCurrentException(); @@ -517,17 +514,27 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - // if (consumer.getMessageListener() != null) - // { - getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, - MAX_PREFETCH); - // } + if (ClientProperties.MAX_PREFETCH == 0) + { + if (consumer.getMessageListener() != null) + { + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + } + } + else + { + getQpidSession() + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, + ClientProperties.MAX_PREFETCH); + } getQpidSession() - .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); + .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, + 0xFFFFFFFF); } - catch(Exception e) + catch (Exception e) { - throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error while trying to get the listener",e); + throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 9d24fbf953..c40ec1e5cb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -27,7 +27,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Session; import org.apache.qpidity.transport.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.filter.MessageFilter; @@ -39,6 +38,7 @@ import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; /** * This is a 0.10 message consumer. @@ -72,6 +72,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer + -- cgit v1.2.1