From 19b863e9fe0ddab32166b2f4b200b9199b079309 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 23 Jan 2010 23:17:58 +0000 Subject: QPID-2321 : Add queue browsing capability to 0-8 .net client to allow for use of LVQ git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@902505 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/dotnet/Qpid.Client/Client/AmqChannel.cs | 63 +++++++++-- .../Qpid.Client/Client/BasicMessageConsumer.cs | 10 +- .../testcases/BaseMessagingTestFixture.cs | 25 ++++- .../testcases/Qpid.Integration.Tests.csproj | 1 + .../testcases/QueueBrowsingTest.cs | 121 +++++++++++++++++++++ qpid/dotnet/Qpid.Messaging/IChannel.cs | 28 +++++ .../Qpid.Messaging/MessageConsumerBuilder.cs | 10 +- 7 files changed, 243 insertions(+), 15 deletions(-) create mode 100644 qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs diff --git a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs index 86dc9a4681..84c7c06fe1 100644 --- a/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs +++ b/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -251,7 +251,20 @@ namespace Apache.Qpid.Client /// True if the queue should be deleted when the channel closes public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) { - DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, null); + } + + /// + /// Declare a new queue with the specified set of arguments. + /// + /// Name of the queue + /// True if the queue should be durable + /// True if the queue should be exclusive to this channel + /// True if the queue should be deleted when the channel closes + /// Optional arguments to Queue.Declare + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete, args); } /// @@ -386,9 +399,33 @@ namespace Apache.Qpid.Client _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); - return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, false); + } + + /// + /// Creates a new consumer. + /// + /// Name of queue to receive messages from + /// Low prefetch value + /// High prefetch value + /// If true, messages sent on this channel will not be received by this consumer + /// If true, the consumer opens the queue in exclusive mode + /// If true, the consumer only browses and does not consume messages + /// The new consumer + public IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool browse) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} browse={5}", + queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse)); + + return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, browse); } + /// /// Unsubscribe from a queue. /// @@ -712,7 +749,8 @@ namespace Apache.Qpid.Client int prefetchLow, int prefetchHigh, bool noLocal, - bool exclusive) + bool exclusive, + bool browse) { lock (_closingLock) { @@ -720,7 +758,8 @@ namespace Apache.Qpid.Client BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, _messageFactoryRegistry, this, - prefetchHigh, prefetchLow, exclusive); + prefetchHigh, prefetchLow, exclusive, + browse); try { RegisterConsumer(consumer); @@ -894,7 +933,7 @@ namespace Apache.Qpid.Client _consumers.Add(tag, consumer); String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, - consumer.Exclusive, consumer.AcknowledgeMode, tag); + consumer.Exclusive, consumer.AcknowledgeMode, tag, consumer.Browse); } @@ -919,13 +958,17 @@ namespace Apache.Qpid.Client routingKey, true, args)); } - private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) + private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag, bool browse) { - + FieldTable args = new FieldTable(); + if(browse) + { + args["x-filter-no-consume"] = true; + } AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, queueName, tag, noLocal, acknowledgeMode == AcknowledgeMode.NoAcknowledge, - exclusive, true, new FieldTable()); + exclusive, true, args); _replayFrames.Add(basicConsume); @@ -958,13 +1001,13 @@ namespace Apache.Qpid.Client } } - private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args) { _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", queueName, isDurable, isExclusive, isAutoDelete)); AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, - isAutoDelete, false, null); + isAutoDelete, false, (FieldTable) args); lock (_connection.FailoverMutex) diff --git a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs index 6fee316cb4..fdac5e75f2 100644 --- a/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs +++ b/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -44,6 +44,13 @@ namespace Apache.Qpid.Client get { return _exclusive; } } + private bool _browse; + + public bool Browse + { + get { return _browse; } + } + public bool NoLocal { get { return _noLocal; } @@ -131,7 +138,7 @@ namespace Apache.Qpid.Client internal BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, MessageFactoryRegistry messageFactory, AmqChannel channel, - int prefetchHigh, int prefetchLow, bool exclusive) + int prefetchHigh, int prefetchLow, bool exclusive, bool browse) { _channelId = channelId; _queueName = queueName; @@ -142,6 +149,7 @@ namespace Apache.Qpid.Client _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; + _browse = browse; if (_acknowledgeMode == AcknowledgeMode.SessionTransacted) { diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs index 4c82dbe08c..e67d96f188 100644 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs @@ -89,7 +89,7 @@ namespace Apache.Qpid.Integration.Tests.testcases { log.Debug("public virtual void Shutdown(): called"); } - + /// Sets up the nth test end-point. /// /// The index of the test end-point to set up. @@ -104,6 +104,25 @@ namespace Apache.Qpid.Integration.Tests.testcases /// If durable is true, the fixed unique queue name to use. public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, string exchangeName, bool declareBind, bool durable, string subscriptionName) + { + SetUpEndPoint(n, producer, consumer, routingKey, ackMode, transacted, exchangeName, declareBind, durable, subscriptionName, true, false); + } + /// Sets up the nth test end-point. + /// + /// The index of the test end-point to set up. + /// true to set up a producer on the end-point. + /// true to set up a consumer on the end-point. + /// The routing key for the producer to send on. + /// The ack mode for the end-points channel. + /// true to use transactions on the end-points channel. + /// The exchange to produce or consume on. + /// true if the consumers queue should be declared and bound, false if it has already been. + /// true to declare the consumers queue as durable. + /// If durable is true, the fixed unique queue name to use. + /// true declare queue as exclusive. + /// true only browse, don''t consume. + public void SetUpEndPoint(int n, bool producer, bool consumer, string routingKey, AcknowledgeMode ackMode, bool transacted, + string exchangeName, bool declareBind, bool durable, string subscriptionName, bool exclusive, bool browse) { // Allow client id to be fixed, or undefined. { @@ -137,7 +156,7 @@ namespace Apache.Qpid.Integration.Tests.testcases if (declareBind) { - testChannel[n].DeclareQueue(queueName, durable, true, false); + testChannel[n].DeclareQueue(queueName, durable, exclusive, false); testChannel[n].Bind(queueName, exchangeName, routingKey); } } @@ -156,7 +175,7 @@ namespace Apache.Qpid.Integration.Tests.testcases } } - testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).Create(); + testConsumer[n] = testChannel[n].CreateConsumerBuilder(queueName).WithBrowse(browse).Create(); } } diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj b/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj index 1df37f6c1b..01ca2cc5bd 100755 --- a/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/Qpid.Integration.Tests.csproj @@ -59,5 +59,6 @@ + diff --git a/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs b/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs new file mode 100644 index 0000000000..2b77063342 --- /dev/null +++ b/qpid/dotnet/Qpid.Integration.Tests/testcases/QueueBrowsingTest.cs @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Integration.Tests.testcases +{ + [TestFixture, Category("Integration")] + public class QueueBrowsingTest : BaseMessagingTestFixture + { + /// Used for debugging purposes. + private static ILog log = LogManager.GetLogger(typeof(QueueBrowsingTest)); + + public const string TEST_ROUTING_KEY = "queuebrowsingkey"; + public const string TEST_ROUTING_KEY2 = "lvqbrowsingkey"; + + + [SetUp] + public override void Init() + { + base.Init(); + } + + [TearDown] + public override void Shutdown() + { + base.Shutdown(); + } + + [Test] + public void TestQueueBrowsing() + { + // Create a topic with one producer and two consumers. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, null, false, false); + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY + testId, false, true); + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY + testId, false, true); + + Thread.Sleep(500); + + // Send messages and receive on both consumers. + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + testProducer[0].Send(testChannel[0].CreateTextMessage("msg")); + + Thread.Sleep(2000); + + + ConsumeNMessagesOnly(6, "msg", testConsumer[1]); + ConsumeNMessagesOnly(6, "msg", testConsumer[2]); + + // Clean up any open consumers at the end of the test. + CloseEndPoint(2); + CloseEndPoint(1); + CloseEndPoint(0); + } + + [Test] + public void TestQueueBrowsingLVQ() + { + // Create a topic with one producer and two consumers. + SetUpEndPoint(0, true, false, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.AutoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, false); + FieldTable args = new FieldTable(); + args.SetBoolean("qpid.last_value_queue", true); + args.SetString("qpid.last_value_queue_key", "key"); + testChannel[0].DeclareQueue(TEST_ROUTING_KEY2 + testId, true, false, false, args); + testChannel[0].Bind(TEST_ROUTING_KEY2 + testId, ExchangeNameDefaults.DIRECT, TEST_ROUTING_KEY2 + testId); + Thread.Sleep(500); + + + for (int i = 0; i < 12; i++) + { + ITextMessage msg = testChannel[0].CreateTextMessage("msg"); + msg.Headers.SetInt("key", i%6); + testProducer[0].Send(msg); + } + + Thread.Sleep(2000); + + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, true); + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY2 + testId, AcknowledgeMode.NoAcknowledge, false, ExchangeNameDefaults.DIRECT, true, true, TEST_ROUTING_KEY2 + testId, false, true); + + Thread.Sleep(500); + + + ConsumeNMessagesOnly(6, "msg", testConsumer[1]); + ConsumeNMessagesOnly(6, "msg", testConsumer[2]); + + // Clean up any open consumers at the end of the test. + CloseEndPoint(2); + CloseEndPoint(1); + CloseEndPoint(0); + } + + } +} diff --git a/qpid/dotnet/Qpid.Messaging/IChannel.cs b/qpid/dotnet/Qpid.Messaging/IChannel.cs index 461867b34a..1db8b5fbdb 100644 --- a/qpid/dotnet/Qpid.Messaging/IChannel.cs +++ b/qpid/dotnet/Qpid.Messaging/IChannel.cs @@ -97,6 +97,17 @@ namespace Apache.Qpid.Messaging /// True if the queue should be deleted when the channel closes void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete); + + /// + /// Declare a new queue with the specified set of arguments. + /// + /// Name of the queue + /// True if the queue should be durable + /// True if the queue should be exclusive to this channel + /// True if the queue should be deleted when the channel closes + /// Optional arguments to Queue.Declare + void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete, IFieldTable args); + /// /// Delete a queue with the specifies arguments. /// @@ -191,6 +202,23 @@ namespace Apache.Qpid.Messaging int prefetchHigh, bool noLocal, bool exclusive); + + /// + /// Creates a new consumer. + /// + /// Name of queue to receive messages from + /// Low prefetch value + /// High prefetch value + /// If true, messages sent on this channel will not be received by this consumer + /// If true, the consumer opens the queue in exclusive mode + /// If true, the consumer only browses and does not consume + /// The new consumer + IMessageConsumer CreateConsumer(string queueName, + int prefetchLow, + int prefetchHigh, + bool noLocal, + bool exclusive, + bool browse); /// /// Unsubscribe from a queue. diff --git a/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs b/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs index fbf94d7c27..91a2371788 100644 --- a/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs +++ b/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs @@ -38,6 +38,8 @@ namespace Apache.Qpid.Messaging private bool _exclusive = false; + private bool _browse = false; + //private bool _durable = false; //private string _subscriptionName = null; @@ -81,6 +83,12 @@ namespace Apache.Qpid.Messaging return this; } + public MessageConsumerBuilder WithBrowse(bool browse) + { + _browse = browse; + return this; + } + /* public MessageConsumerBuilder WithDurable(bool durable) { @@ -99,7 +107,7 @@ namespace Apache.Qpid.Messaging public IMessageConsumer Create() { - return _channel.CreateConsumer(_queueName, _prefetchLow, _prefetchHigh, _noLocal, _exclusive); + return _channel.CreateConsumer(_queueName, _prefetchLow, _prefetchHigh, _noLocal, _exclusive, _browse); } } } -- cgit v1.2.1