From 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 Mon Sep 17 00:00:00 2001 From: Steven Shaw Date: Thu, 30 Nov 2006 18:54:48 +0000 Subject: QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow QPID-137 Ported AcknowledgeModes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481035 13f79535-47bb-0310-9956-ffa450edef68 --- dotnet/Qpid.Client/Client/AmqChannel.cs | 35 +++++++++++++++++---------------- 1 file changed, 18 insertions(+), 17 deletions(-) (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs') diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs index 0ab3fd3411..2ffd6505c6 100644 --- a/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -43,7 +43,7 @@ namespace Qpid.Client // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. private int _nextConsumerNumber = 1; - internal const int DEFAULT_PREFETCH = 5000; + internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH; private AMQConnection _connection; @@ -273,6 +273,7 @@ namespace Qpid.Client public void Commit() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -297,6 +298,7 @@ namespace Qpid.Client public void Rollback() { + // FIXME: Fail over safety. Needs FailoverSupport? CheckNotClosed(); CheckTransacted(); // throws IllegalOperationException if not a transacted session @@ -489,25 +491,26 @@ namespace Qpid.Client } public IMessageConsumer CreateConsumer(string queueName, - int prefetch, + int prefetchLow, + int prefetchHigh, bool noLocal, bool exclusive, bool durable, string subscriptionName) { - _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}", - queueName, prefetch, noLocal, exclusive, durable, subscriptionName)); - return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName); + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName)); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName); } private IMessageConsumer CreateConsumerImpl(string queueName, - int prefetch, - bool noLocal, - bool exclusive, - bool durable, - string subscriptionName) + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) { - if (durable || subscriptionName != null) { throw new NotImplementedException(); // TODO: durable subscriptions. @@ -518,7 +521,8 @@ namespace Qpid.Client CheckNotClosed(); BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, - _messageFactoryRegistry, this); + _messageFactoryRegistry, this, + prefetchHigh, prefetchLow, exclusive); try { RegisterConsumer(consumer); @@ -710,9 +714,8 @@ namespace Qpid.Client /// void RegisterConsumer(BasicMessageConsumer consumer) { - String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal, + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, consumer.Exclusive, consumer.AcknowledgeMode); - consumer.ConsumerTag = consumerTag; _consumers.Add(consumerTag, consumer); } @@ -744,8 +747,7 @@ namespace Qpid.Client } } - private String ConsumeFromQueue(String queueName, int prefetch, - bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) { // Need to generate a consumer tag on the client so we can exploit the nowait flag. String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); @@ -973,7 +975,6 @@ namespace Qpid.Client public void AcknowledgeMessage(ulong deliveryTag, bool multiple) { AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); - _logger.Info("XXX sending ack: " + ackFrame); if (_logger.IsDebugEnabled) { _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); -- cgit v1.2.1