diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-12-03 23:55:48 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-12-03 23:55:48 +0000 |
| commit | de0e81996d63d8a8b7f92d835d2bbdeaf5cccae0 (patch) | |
| tree | 8dbde527e8f656849ed83981359c8dcd2512395c /qpid/dotnet/Qpid.Client.Tests/interop | |
| parent | e769aed97fa119033e38d829be25135a4ea08a52 (diff) | |
| download | qpid-python-de0e81996d63d8a8b7f92d835d2bbdeaf5cccae0.tar.gz | |
Fix eol style property
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@886998 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/Qpid.Client.Tests/interop')
| -rw-r--r-- | qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs | 422 | ||||
| -rw-r--r-- | qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs | 416 |
2 files changed, 419 insertions, 419 deletions
diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs index 13141d52b8..b355abb28d 100644 --- a/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs +++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicListener.cs @@ -1,211 +1,211 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using log4net;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-
-namespace Apache.Qpid.Client.Tests.interop
-{
- public class TopicListener
- {
- private static ILog log = LogManager.GetLogger(typeof(TopicListener));
-
- /// <summary> The default AMQ connection URL to use for tests. </summary>
- const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
-
- /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
- public static string CONTROL_ROUTING_KEY = "topic_control";
-
- /// <summary> Holds the routing key for the queue to send reports to. </summary>
- public static string RESPONSE_ROUTING_KEY = "response";
-
- /// <summary> Holds the connection to listen on. </summary>
- private IConnection connection;
-
- /// <summary> Holds the channel for all test messages.</summary>
- private IChannel channel;
-
- /// <summary> Holds the producer to send report messages on. </summary>
- private IMessagePublisher publisher;
-
- /// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */
- private bool init;
-
- /// <summary> Holds the count of messages received by this listener. </summary> */
- private int count;
-
- /// <summary> Creates a topic listener using the specified broker URL. </summary>
- ///
- /// <param name="connectionUri">The broker URL to listen on.</param>
- TopicListener(string connectionUri)
- {
- log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called");
-
- // Create a connection to the broker.
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
- connection = new AMQConnection(connectionInfo);
-
- // Establish a session on the broker.
- channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
-
- // Set up a queue to listen for test messages on.
- string topicQueueName = channel.GenerateUniqueName();
- channel.DeclareQueue(topicQueueName, false, true, true);
-
- // Set this listener up to listen for incoming messages on the test topic queue.
- channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY);
- IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName)
- .Create();
- consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
-
- // Set up this listener with a producer to send the reports on.
- publisher = channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.DIRECT)
- .WithRoutingKey(RESPONSE_ROUTING_KEY)
- .Create();
-
- connection.Start();
- Console.WriteLine("Waiting for messages...");
- }
-
- public static void Main(String[] argv)
- {
- // Create an instance of this listener with the command line parameters.
- new TopicListener(DEFAULT_URI);
- }
-
- /// <summary>
- /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
- /// shutdown messages result in this listener being terminated.
- /// </summary>
- ///
- /// <param name="message">The received message.</param>
- public void OnMessage(IMessage message)
- {
- log.Debug("public void onMessage(Message message = " + message + "): called");
-
- // Take the start time of the first message if this is the first message.
- if (!init)
- {
- count = 0;
- init = true;
- }
-
- // Check if the message is a control message telling this listener to shut down.
- if (IsShutdown(message))
- {
- log.Debug("Got a shutdown message.");
- Shutdown();
- }
- // Check if the message is a report request message asking this listener to respond with the message count.
- else if (IsReport(message))
- {
- log.Debug("Got a report request message.");
-
- // Send the message count report.
- SendReport();
-
- // Reset the initialization flag so that the next message is considered to be the first.
- init = false;
- }
- // Otherwise it is an ordinary test message, so increment the message count.
- else
- {
- count++;
- }
- }
-
- /// <summary> Checks a message to see if it is a shutdown control message. </summary>
- ///
- /// <param name="m">The message to check.</param>
- ///
- /// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns>
- private bool IsShutdown(IMessage m)
- {
- bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST");
-
- //log.Debug("isShutdown = " + result);
-
- return result;
- }
-
- /// <summary> Checks a message to see if it is a report request control message. </summary>
- ///
- /// <param name="m">The message to check.</param>
- ///
- /// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns>
- private bool IsReport(IMessage m)
- {
- bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST");
-
- //log.Debug("isReport = " + result);
-
- return result;
- }
-
- /// <summary> Checks whether or not a text field on a message has the specified value. </summary>
- ///
- /// <param name="e">The message to check.</param>
- /// <param name="e">The name of the field to check.</param>
- /// <param name="e">The expected value of the field to compare with.</param>
- ///
- /// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns>
- private static bool CheckTextField(IMessage m, string fieldName, string value)
- {
- /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
- + ", String value = " + value + "): called");*/
-
- string comp = m.Headers.GetString(fieldName);
-
- return (comp != null) && comp == value;
- }
-
- /// <summary> Stops the message consumer and closes the connection. </summary>
- private void Shutdown()
- {
- connection.Stop();
- channel.Dispose();
- connection.Dispose();
- }
-
- /// <summary> Sends the report message to the response location. </summary>
- private void SendReport()
- {
- string report = "Received " + count + ".";
-
- IMessage reportMessage = channel.CreateTextMessage(report);
-
- reportMessage.Headers.SetBoolean("BOOLEAN", false);
- //reportMessage.Headers.SetByte("BYTE", 5);
- reportMessage.Headers.SetDouble("DOUBLE", 3.141);
- reportMessage.Headers.SetFloat("FLOAT", 1.0f);
- reportMessage.Headers.SetInt("INT", 1);
- reportMessage.Headers.SetLong("LONG", 1);
- reportMessage.Headers.SetString("STRING", "hello");
- reportMessage.Headers.SetShort("SHORT", 2);
-
- publisher.Send(reportMessage);
-
- Console.WriteLine("Sent report: " + report);
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Tests.interop +{ + public class TopicListener + { + private static ILog log = LogManager.GetLogger(typeof(TopicListener)); + + /// <summary> The default AMQ connection URL to use for tests. </summary> + const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; + + /// <summary> Holds the routing key for the topic to receive test messages on. </summary> + public static string CONTROL_ROUTING_KEY = "topic_control"; + + /// <summary> Holds the routing key for the queue to send reports to. </summary> + public static string RESPONSE_ROUTING_KEY = "response"; + + /// <summary> Holds the connection to listen on. </summary> + private IConnection connection; + + /// <summary> Holds the channel for all test messages.</summary> + private IChannel channel; + + /// <summary> Holds the producer to send report messages on. </summary> + private IMessagePublisher publisher; + + /// <summary> Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. </summary> */ + private bool init; + + /// <summary> Holds the count of messages received by this listener. </summary> */ + private int count; + + /// <summary> Creates a topic listener using the specified broker URL. </summary> + /// + /// <param name="connectionUri">The broker URL to listen on.</param> + TopicListener(string connectionUri) + { + log.Debug("TopicListener(string connectionUri = " + connectionUri + "): called"); + + // Create a connection to the broker. + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); + connection = new AMQConnection(connectionInfo); + + // Establish a session on the broker. + channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); + + // Set up a queue to listen for test messages on. + string topicQueueName = channel.GenerateUniqueName(); + channel.DeclareQueue(topicQueueName, false, true, true); + + // Set this listener up to listen for incoming messages on the test topic queue. + channel.Bind(topicQueueName, ExchangeNameDefaults.TOPIC, CONTROL_ROUTING_KEY); + IMessageConsumer consumer = channel.CreateConsumerBuilder(topicQueueName) + .Create(); + consumer.OnMessage += new MessageReceivedDelegate(OnMessage); + + // Set up this listener with a producer to send the reports on. + publisher = channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.DIRECT) + .WithRoutingKey(RESPONSE_ROUTING_KEY) + .Create(); + + connection.Start(); + Console.WriteLine("Waiting for messages..."); + } + + public static void Main(String[] argv) + { + // Create an instance of this listener with the command line parameters. + new TopicListener(DEFAULT_URI); + } + + /// <summary> + /// Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and + /// shutdown messages result in this listener being terminated. + /// </summary> + /// + /// <param name="message">The received message.</param> + public void OnMessage(IMessage message) + { + log.Debug("public void onMessage(Message message = " + message + "): called"); + + // Take the start time of the first message if this is the first message. + if (!init) + { + count = 0; + init = true; + } + + // Check if the message is a control message telling this listener to shut down. + if (IsShutdown(message)) + { + log.Debug("Got a shutdown message."); + Shutdown(); + } + // Check if the message is a report request message asking this listener to respond with the message count. + else if (IsReport(message)) + { + log.Debug("Got a report request message."); + + // Send the message count report. + SendReport(); + + // Reset the initialization flag so that the next message is considered to be the first. + init = false; + } + // Otherwise it is an ordinary test message, so increment the message count. + else + { + count++; + } + } + + /// <summary> Checks a message to see if it is a shutdown control message. </summary> + /// + /// <param name="m">The message to check.</param> + /// + /// <returns><tt>true</tt> if it is a shutdown control message, <tt>false</tt> otherwise.</returns> + private bool IsShutdown(IMessage m) + { + bool result = CheckTextField(m, "TYPE", "TERMINATION_REQUEST"); + + //log.Debug("isShutdown = " + result); + + return result; + } + + /// <summary> Checks a message to see if it is a report request control message. </summary> + /// + /// <param name="m">The message to check.</param> + /// + /// <returns><tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.</returns> + private bool IsReport(IMessage m) + { + bool result = CheckTextField(m, "TYPE", "REPORT_REQUEST"); + + //log.Debug("isReport = " + result); + + return result; + } + + /// <summary> Checks whether or not a text field on a message has the specified value. </summary> + /// + /// <param name="e">The message to check.</param> + /// <param name="e">The name of the field to check.</param> + /// <param name="e">The expected value of the field to compare with.</param> + /// + /// <returns> <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. </returns> + private static bool CheckTextField(IMessage m, string fieldName, string value) + { + /*log.Debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + + ", String value = " + value + "): called");*/ + + string comp = m.Headers.GetString(fieldName); + + return (comp != null) && comp == value; + } + + /// <summary> Stops the message consumer and closes the connection. </summary> + private void Shutdown() + { + connection.Stop(); + channel.Dispose(); + connection.Dispose(); + } + + /// <summary> Sends the report message to the response location. </summary> + private void SendReport() + { + string report = "Received " + count + "."; + + IMessage reportMessage = channel.CreateTextMessage(report); + + reportMessage.Headers.SetBoolean("BOOLEAN", false); + //reportMessage.Headers.SetByte("BYTE", 5); + reportMessage.Headers.SetDouble("DOUBLE", 3.141); + reportMessage.Headers.SetFloat("FLOAT", 1.0f); + reportMessage.Headers.SetInt("INT", 1); + reportMessage.Headers.SetLong("LONG", 1); + reportMessage.Headers.SetString("STRING", "hello"); + reportMessage.Headers.SetShort("SHORT", 2); + + publisher.Send(reportMessage); + + Console.WriteLine("Sent report: " + report); + } + } +} diff --git a/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs index a96c1f327b..4fd0419e9c 100644 --- a/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs +++ b/qpid/dotnet/Qpid.Client.Tests/interop/TopicPublisher.cs @@ -1,208 +1,208 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Threading;
-using log4net;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Client.Qms;
-
-namespace Apache.Qpid.Client.Tests.interop
-{
- public class TopicPublisher
- {
- private static ILog log = LogManager.GetLogger(typeof(TopicPublisher));
-
- /// <summary> The default AMQ connection URL to use for tests. </summary>
- const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
-
- /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
- const int TIMEOUT = 10000;
-
- /// <summary> Holds the routing key for the topic to receive test messages on. </summary>
- const string CONTROL_ROUTING_KEY = "topic_control";
-
- /// <summary> Holds the routing key for the queue to send reports to. </summary>
- const string RESPONSE_ROUTING_KEY = "response";
-
- /// <summary> Holds the number of messages to send in each test run. </summary>
- private int numMessages;
-
- /// <summary> Holds the number of subscribers listening to the messsages. </summary>
- private int numSubscribers;
-
- /// <summary> A monitor used to wait for all reports to arrive back from consumers on. </summary>
- private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false);
-
- /// <summary> Holds the connection to listen on. </summary>
- private IConnection connection;
-
- /// <summary> Holds the channel for all test messages.</summary>
- private IChannel channel;
-
- /// <summary> Holds the producer to send test messages on. </summary>
- private IMessagePublisher publisher;
-
- /// <summary>
- /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
- /// subscribers.
- /// </summary>
- ///
- /// <param name="connectionUri">The broker URL.</param>
- /// <param name="numMessages">The number of messages to send in each test.</param>
- /// <param name="numSubscribers">The number of subscribes that are expected to reply with a report.</param>
- TopicPublisher(string connectionUri, int numMessages, int numSubscribers)
- {
- log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages +
- ", int numSubscribers = " + numSubscribers + "): called");
-
- // Create a connection to the broker.
- IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
- connection = new AMQConnection(connectionInfo);
-
- // Establish a session on the broker.
- channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
-
- // Set up a queue to listen for reports on.
- string responseQueueName = channel.GenerateUniqueName();
- channel.DeclareQueue(responseQueueName, false, true, true);
-
- // Set this listener up to listen for reports on the response queue.
- channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY);
- //channel.Bind(responseQueueName, "<<default>>", RESPONSE_ROUTING_KEY);
- IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
- .Create();
- consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
-
- // Set up this listener with a producer to send the test messages and report requests on.
- publisher = channel.CreatePublisherBuilder()
- .WithExchangeName(ExchangeNameDefaults.TOPIC)
- .WithRoutingKey(CONTROL_ROUTING_KEY)
- .Create();
-
- // Keep the test parameters.
- this.numMessages = numMessages;
- this.numSubscribers = numSubscribers;
-
- connection.Start();
- Console.WriteLine("Sending messages and waiting for reports...");
- }
-
- /// <summary>
- /// Start a test subscriber. The broker URL must be specified as the first command line argument.
- /// </summary>
- ///
- /// <param name="argv">The command line arguments, broker URL first.</param>
- public static void Main(String[] argv)
- {
- // Create an instance of this publisher with the command line parameters.
- TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1);
-
- // Publish the test messages.
- publisher.DoTest();
- }
-
- /// <summary>
- /// Sends the test messages and waits for all subscribers to reply with a report.
- /// </summary>
- public void DoTest()
- {
- log.Debug("public void DoTest(): called");
-
- // Create a test message to send.
- IMessage testMessage = channel.CreateTextMessage("test");
-
- // Send the desired number of test messages.
- for (int i = 0; i < numMessages; i++)
- {
- publisher.Send(testMessage);
- }
-
- log.Debug("Sent " + numMessages + " test messages.");
-
- // Send the report request.
- IMessage reportRequestMessage = channel.CreateTextMessage("Report request message.");
- reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST";
-
- reportRequestMessage.Headers.SetBoolean("BOOLEAN", false);
- //reportRequestMessage.Headers.SetByte("BYTE", 5);
- reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141);
- reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f);
- reportRequestMessage.Headers.SetInt("INT", 1);
- reportRequestMessage.Headers.SetLong("LONG", 1);
- reportRequestMessage.Headers.SetString("STRING", "hello");
- reportRequestMessage.Headers.SetShort("SHORT", 2);
-
- publisher.Send(reportRequestMessage);
-
- log.Debug("Sent the report request message, waiting for all replies...");
-
- // Wait until all the reports come in.
- allReportsReceivedEvt.WaitOne(TIMEOUT, true);
-
- // Check if all reports were really received or if the timeout occurred.
- if (numSubscribers == 0)
- {
- log.Debug("Got all reports.");
- }
- else
- {
- log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + ".");
- }
-
- // Send the termination request.
- IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message.");
- terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST";
- publisher.Send(terminationRequestMessage);
-
- log.Debug("Sent the termination request message.");
-
- // Close all message producers and consumers and the connection to the broker.
- Shutdown();
- }
-
- /// <summary>
- /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
- /// zero, at which time waiting threads are notified of this event.
- /// </summary>
- ///
- /// <param name="message">The received report message.</param>
- public void OnMessage(IMessage message)
- {
- log.Debug("public void OnMessage(IMessage message = " + message + "): called");
-
- // Decrement the count of expected messages and release the wait monitor when this becomes zero.
- if (--numSubscribers == 0)
- {
- log.Debug("Got reports from all subscribers.");
- allReportsReceivedEvt.Set();
- }
- }
-
- /// <summary> Stops the message consumers and closes the connection. </summary>
- private void Shutdown()
- {
- connection.Stop();
- publisher.Dispose();
- channel.Dispose();
- connection.Dispose();
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Apache.Qpid.Messaging; +using Apache.Qpid.Client.Qms; + +namespace Apache.Qpid.Client.Tests.interop +{ + public class TopicPublisher + { + private static ILog log = LogManager.GetLogger(typeof(TopicPublisher)); + + /// <summary> The default AMQ connection URL to use for tests. </summary> + const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'"; + + /// <summary> Holds the default test timeout for broker communications before tests give up. </summary> + const int TIMEOUT = 10000; + + /// <summary> Holds the routing key for the topic to receive test messages on. </summary> + const string CONTROL_ROUTING_KEY = "topic_control"; + + /// <summary> Holds the routing key for the queue to send reports to. </summary> + const string RESPONSE_ROUTING_KEY = "response"; + + /// <summary> Holds the number of messages to send in each test run. </summary> + private int numMessages; + + /// <summary> Holds the number of subscribers listening to the messsages. </summary> + private int numSubscribers; + + /// <summary> A monitor used to wait for all reports to arrive back from consumers on. </summary> + private AutoResetEvent allReportsReceivedEvt = new AutoResetEvent(false); + + /// <summary> Holds the connection to listen on. </summary> + private IConnection connection; + + /// <summary> Holds the channel for all test messages.</summary> + private IChannel channel; + + /// <summary> Holds the producer to send test messages on. </summary> + private IMessagePublisher publisher; + + /// <summary> + /// Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test + /// subscribers. + /// </summary> + /// + /// <param name="connectionUri">The broker URL.</param> + /// <param name="numMessages">The number of messages to send in each test.</param> + /// <param name="numSubscribers">The number of subscribes that are expected to reply with a report.</param> + TopicPublisher(string connectionUri, int numMessages, int numSubscribers) + { + log.Debug("TopicPublisher(string connectionUri = " + connectionUri + ", int numMessages = "+ numMessages + + ", int numSubscribers = " + numSubscribers + "): called"); + + // Create a connection to the broker. + IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri); + connection = new AMQConnection(connectionInfo); + + // Establish a session on the broker. + channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1); + + // Set up a queue to listen for reports on. + string responseQueueName = channel.GenerateUniqueName(); + channel.DeclareQueue(responseQueueName, false, true, true); + + // Set this listener up to listen for reports on the response queue. + channel.Bind(responseQueueName, ExchangeNameDefaults.DIRECT, RESPONSE_ROUTING_KEY); + //channel.Bind(responseQueueName, "<<default>>", RESPONSE_ROUTING_KEY); + IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName) + .Create(); + consumer.OnMessage += new MessageReceivedDelegate(OnMessage); + + // Set up this listener with a producer to send the test messages and report requests on. + publisher = channel.CreatePublisherBuilder() + .WithExchangeName(ExchangeNameDefaults.TOPIC) + .WithRoutingKey(CONTROL_ROUTING_KEY) + .Create(); + + // Keep the test parameters. + this.numMessages = numMessages; + this.numSubscribers = numSubscribers; + + connection.Start(); + Console.WriteLine("Sending messages and waiting for reports..."); + } + + /// <summary> + /// Start a test subscriber. The broker URL must be specified as the first command line argument. + /// </summary> + /// + /// <param name="argv">The command line arguments, broker URL first.</param> + public static void Main(String[] argv) + { + // Create an instance of this publisher with the command line parameters. + TopicPublisher publisher = new TopicPublisher(DEFAULT_URI, 1, 1); + + // Publish the test messages. + publisher.DoTest(); + } + + /// <summary> + /// Sends the test messages and waits for all subscribers to reply with a report. + /// </summary> + public void DoTest() + { + log.Debug("public void DoTest(): called"); + + // Create a test message to send. + IMessage testMessage = channel.CreateTextMessage("test"); + + // Send the desired number of test messages. + for (int i = 0; i < numMessages; i++) + { + publisher.Send(testMessage); + } + + log.Debug("Sent " + numMessages + " test messages."); + + // Send the report request. + IMessage reportRequestMessage = channel.CreateTextMessage("Report request message."); + reportRequestMessage.Headers["TYPE"] = "REPORT_REQUEST"; + + reportRequestMessage.Headers.SetBoolean("BOOLEAN", false); + //reportRequestMessage.Headers.SetByte("BYTE", 5); + reportRequestMessage.Headers.SetDouble("DOUBLE", 3.141); + reportRequestMessage.Headers.SetFloat("FLOAT", 1.0f); + reportRequestMessage.Headers.SetInt("INT", 1); + reportRequestMessage.Headers.SetLong("LONG", 1); + reportRequestMessage.Headers.SetString("STRING", "hello"); + reportRequestMessage.Headers.SetShort("SHORT", 2); + + publisher.Send(reportRequestMessage); + + log.Debug("Sent the report request message, waiting for all replies..."); + + // Wait until all the reports come in. + allReportsReceivedEvt.WaitOne(TIMEOUT, true); + + // Check if all reports were really received or if the timeout occurred. + if (numSubscribers == 0) + { + log.Debug("Got all reports."); + } + else + { + log.Debug("Waiting for reports timed out, still waiting for " + numSubscribers + "."); + } + + // Send the termination request. + IMessage terminationRequestMessage = channel.CreateTextMessage("Termination request message."); + terminationRequestMessage.Headers["TYPE"] = "TERMINATION_REQUEST"; + publisher.Send(terminationRequestMessage); + + log.Debug("Sent the termination request message."); + + // Close all message producers and consumers and the connection to the broker. + Shutdown(); + } + + /// <summary> + /// Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes + /// zero, at which time waiting threads are notified of this event. + /// </summary> + /// + /// <param name="message">The received report message.</param> + public void OnMessage(IMessage message) + { + log.Debug("public void OnMessage(IMessage message = " + message + "): called"); + + // Decrement the count of expected messages and release the wait monitor when this becomes zero. + if (--numSubscribers == 0) + { + log.Debug("Got reports from all subscribers."); + allReportsReceivedEvt.Set(); + } + } + + /// <summary> Stops the message consumers and closes the connection. </summary> + private void Shutdown() + { + connection.Stop(); + publisher.Dispose(); + channel.Dispose(); + connection.Dispose(); + } + } +} |
