summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/AmqChannel.cs
diff options
context:
space:
mode:
authorSteven Shaw <steshaw@apache.org>2006-11-30 18:54:48 +0000
committerSteven Shaw <steshaw@apache.org>2006-11-30 18:54:48 +0000
commit33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (patch)
tree1fdc64001d5e0bf1f34883927d7901b456b7bd3b /dotnet/Qpid.Client/Client/AmqChannel.cs
parent8f21f5d6cacd35e6fe04a0b4a5567fc4929f997e (diff)
downloadqpid-python-33c04c7e619a65e2d92ac231805e8ad27f4a29c2.tar.gz
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481035 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/AmqChannel.cs')
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs35
1 files changed, 18 insertions, 17 deletions
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
index 0ab3fd3411..2ffd6505c6 100644
--- a/dotnet/Qpid.Client/Client/AmqChannel.cs
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -43,7 +43,7 @@ namespace Qpid.Client
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- internal const int DEFAULT_PREFETCH = 5000;
+ internal const int DEFAULT_PREFETCH = MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
private AMQConnection _connection;
@@ -273,6 +273,7 @@ namespace Qpid.Client
public void Commit()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
@@ -297,6 +298,7 @@ namespace Qpid.Client
public void Rollback()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a transacted session
@@ -489,25 +491,26 @@ namespace Qpid.Client
}
public IMessageConsumer CreateConsumer(string queueName,
- int prefetch,
+ int prefetchLow,
+ int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
string subscriptionName)
{
- _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}",
- queueName, prefetch, noLocal, exclusive, durable, subscriptionName));
- return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName);
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}",
+ queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName));
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName);
}
private IMessageConsumer CreateConsumerImpl(string queueName,
- int prefetch,
- bool noLocal,
- bool exclusive,
- bool durable,
- string subscriptionName)
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
{
-
if (durable || subscriptionName != null)
{
throw new NotImplementedException(); // TODO: durable subscriptions.
@@ -518,7 +521,8 @@ namespace Qpid.Client
CheckNotClosed();
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
- _messageFactoryRegistry, this);
+ _messageFactoryRegistry, this,
+ prefetchHigh, prefetchLow, exclusive);
try
{
RegisterConsumer(consumer);
@@ -710,9 +714,8 @@ namespace Qpid.Client
/// <param name="consumer"></param>
void RegisterConsumer(BasicMessageConsumer consumer)
{
- String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal,
+ String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
consumer.Exclusive, consumer.AcknowledgeMode);
-
consumer.ConsumerTag = consumerTag;
_consumers.Add(consumerTag, consumer);
}
@@ -744,8 +747,7 @@ namespace Qpid.Client
}
}
- private String ConsumeFromQueue(String queueName, int prefetch,
- bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
{
// Need to generate a consumer tag on the client so we can exploit the nowait flag.
String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
@@ -973,7 +975,6 @@ namespace Qpid.Client
public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple);
- _logger.Info("XXX sending ack: " + ackFrame);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);