diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2008-01-15 17:29:41 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2008-01-15 17:29:41 +0000 |
| commit | 414e3cbb38fd3ce5dd605f65b684e1d4a127aa5a (patch) | |
| tree | 5e748153015aa3725997e29ca98e53b7d6a9afae /dotnet/Qpid.Integration.Tests | |
| parent | c23f086cebae13d9785ba8572dabadb4eb4d9402 (diff) | |
| download | qpid-python-414e3cbb38fd3ce5dd605f65b684e1d4a127aa5a.tar.gz | |
Qpid-491 Integration and pure unit test types have been split into seperate modules. Executable for integration style tests not created, and may not be, but seperate target in the nant build has been created, so there is at least a way to run all integration style tests from one place.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@612165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Integration.Tests')
19 files changed, 2948 insertions, 0 deletions
diff --git a/dotnet/Qpid.Integration.Tests/README.txt b/dotnet/Qpid.Integration.Tests/README.txt new file mode 100644 index 0000000000..389e3b2c6c --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/README.txt @@ -0,0 +1,3 @@ +This directory contains NUnit tests, which are 'integration' oriented. These differ
+from 'pure unit' tests which run against the code only. The integration tests require
+a broker to be available to connect to, and apply test cases that interact with it.
diff --git a/dotnet/Qpid.Integration.Tests/default.build b/dotnet/Qpid.Integration.Tests/default.build new file mode 100644 index 0000000000..6f3de7e7b8 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/default.build @@ -0,0 +1,47 @@ +<?xml version="1.0"?>
+<project name="Apache.Qpid.Integration.Tests" default="test">
+
+ <!-- Creates a .dll for this module. -->
+ <target name="build">
+
+ <csc target="library"
+ define="${build.defines}"
+ warnaserror="false"
+ debug="${build.debug}"
+ output="${build.dir}/${project::get-name()}.dll">
+
+ <sources>
+ <include name="**/*.cs" />
+ </sources>
+
+ <references>
+ <include name="${build.dir}/log4net.dll" />
+ <include name="${build.dir}/nunit.framework.dll" />
+ <include name="${build.dir}/Apache.Qpid.Common.dll" />
+ <include name="${build.dir}/Apache.Qpid.Messaging.dll" />
+ <include name="${build.dir}/Apache.Qpid.Client.dll" />
+ <include name="${build.dir}/Apache.Qpid.Sasl.dll" />
+ </references>
+ </csc>
+
+ <!--<copy tofile="${build.dir}/${project::get-name()}.dll.config" file="App.config" />-->
+ <!--<copy todir="${build.dir}" file="log4net.config"/>-->
+
+ </target>
+
+ <!-- Runs all of the tests in this module. -->
+ <target name="test" depends="build">
+ <nunit2>
+ <formatter type="${nant.formatter}" usefile="false" />
+ <test>
+ <assemblies>
+ <include name="${build.dir}/${project::get-name()}.dll"/>
+ </assemblies>
+ <categories>
+ <include name="Integration"/>
+ </categories>
+ </test>
+ </nunit2>
+ </target>
+
+</project>
diff --git a/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs new file mode 100644 index 0000000000..fcaabfea3b --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interactive/FailoverTest.cs @@ -0,0 +1,320 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interactive
+{
+ [TestFixture, Category("Interactive")]
+ public class FailoverTest : IConnectionListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTest));
+
+ /// <summary>Specifies the number of times to run the test cycle.</summary>
+ const int NUM_MESSAGES = 10;
+
+ /// <summary>Determines how many messages to send within each commit.</summary>
+ const int COMMIT_BATCH_SIZE = 1;
+
+ /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+ //const int SLEEP_MILLIS = 1;
+
+ /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+ const int TIMEOUT = 10000;
+
+ /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+ const int FAIL_POINT = 5;
+
+ /// <summary>Specified the ack mode to use for the test.</summary>
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+ /// <summary>Determines whether this test runs transactionally or not. </summary>
+ bool transacted = false;
+
+ /// <summary>Holds the connection to run the test over.</summary>
+ AMQConnection _connection;
+
+ /// <summary>Holds the channel for the test message publisher. </summary>
+ IChannel publishingChannel;
+
+ /// <summary>Holds the test message publisher. </summary>
+ IMessagePublisher publisher;
+
+ /// <summary>Used to keep count of the number of messages sent. </summary>
+ int messagesSent;
+
+ /// <summary>Used to keep count of the number of messages received. </summary>
+ int messagesReceived;
+
+ /// <summary>Used to wait for test completion on. </summary>
+ private static object testComplete = new Object();
+
+ /// <summary>
+ /// Creates the test connection with a fail-over set up, and a producer/consumer pair on that connection.
+ /// </summary>
+ /// [SetUp]
+ public void Init(IConnectionInfo connectionInfo)
+ {
+ // Reset all counts.
+ messagesSent = 0;
+ messagesReceived = 0;
+
+ // Create a connection for the test.
+ _connection = new AMQConnection(connectionInfo);
+ _connection.ConnectionListener = this;
+
+ // Create a consumer to receive the test messages.
+ IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+ string queueName = receivingChannel.GenerateUniqueName();
+ receivingChannel.DeclareQueue(queueName, false, true, true);
+ receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
+
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+
+ // Create a publisher to send the test messages.
+ publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+ publisher = publishingChannel.CreatePublisherBuilder()
+ .WithRoutingKey(queueName)
+ .Create();
+
+ _log.Debug("connection = " + _connection);
+ _log.Debug("connectionInfo = " + connectionInfo);
+ _log.Debug("connection.AsUrl = " + _connection.toURL());
+ _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+ }
+
+ /// <summary>
+ /// Clean up the test connection.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
+ {
+ Thread.Sleep(2000);
+ _connection.Close();
+ }
+
+ /// <summary>
+ /// Runs a failover test, building up the connection information from its component parts. In particular the brokers
+ /// to fail between are seperately added into the connection info.
+ /// </summary>
+ /*[Test]
+ public void TestWithBasicInfo()
+ {
+ _log.Debug("public void TestWithBasicInfo(): called");
+
+ // Manually create the connection parameters.
+ QpidConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false));
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false));
+
+ Init(connectionInfo);
+ DoFailoverTest();
+ }*/
+
+ /// <summary>
+ /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+ /// </summary>
+ [Test]
+ public void TestWithUrl()
+ {
+ _log.Debug("public void runTestWithUrl(): called");
+
+ // Parse the connection parameters from a URL.
+ String clientId = "failover" + DateTime.Now.Ticks;
+ string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+ "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+
+ Init(connectionInfo);
+ DoFailoverTest();
+ }
+
+ /// <summary>
+ /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+ /// are received within the test time limit.
+ /// </summary>
+ ///
+ /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+ void DoFailoverTest()
+ {
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): called");
+
+ for (int i = 1; i <= NUM_MESSAGES; ++i)
+ {
+ ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+ //_log.Debug("sending message = " + msg.Text);
+ publisher.Send(msg);
+ messagesSent++;
+
+ _log.Debug("messagesSent = " + messagesSent);
+
+ if (transacted)
+ {
+ publishingChannel.Commit();
+ }
+
+ // Prompt the user to cause a failure if at the fail point.
+ if (i == FAIL_POINT)
+ {
+ PromptAndWait("Cause a broker failure now, then press return...");
+ }
+
+ //Thread.Sleep(SLEEP_MILLIS);
+ }
+
+ // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+ bool withinTimeout;
+
+ lock(testComplete)
+ {
+ withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+ }
+
+ if (!withinTimeout)
+ {
+ Assert.Fail("Test timed out, before all messages received.");
+ }
+
+ _log.Debug("void DoFailoverTest(IConnectionInfo connectionInfo): exiting");
+ }
+
+ /// <summary>
+ /// Receives all of the test messages.
+ /// </summary>
+ ///
+ /// <param name="message">The newly arrived test message.</param>
+ public void OnMessage(IMessage message)
+ {
+ try
+ {
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ message.Acknowledge();
+ }
+
+ messagesReceived++;
+
+ _log.Debug("messagesReceived = " + messagesReceived);
+
+ // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
+ // succesfully completed.
+ if (messagesReceived == NUM_MESSAGES)
+ {
+ lock (testComplete)
+ {
+ Monitor.Pulse(testComplete);
+ }
+ }
+ }
+ catch (QpidException e)
+ {
+ _log.Fatal("Exception received. About to stop.", e);
+ Stop();
+ }
+ }
+
+ /// <summary>Prompts the user on stdout and waits for a reply on stdin, using the specified prompt message.</summary>
+ ///
+ /// <param name="message">The message to prompt the user with.</param>
+ private void PromptAndWait(string message)
+ {
+ Console.WriteLine("\n" + message);
+ Console.ReadLine();
+ }
+
+ // <summary>Closes the test connection.</summary>
+ private void Stop()
+ {
+ _log.Debug("Stopping...");
+ try
+ {
+ _connection.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Debug("Failed to shutdown: ", e);
+ }
+ }
+
+ /// <summary>
+ /// Called when bytes have been transmitted to the server
+ /// </summary>
+ ///
+ /// <param>count the number of bytes sent in total since the connection was opened</param>
+ public void BytesSent(long count) {}
+
+ /// <summary>
+ /// Called when some bytes have been received on a connection
+ /// </summary>
+ ///
+ /// <param>count the number of bytes received in total since the connection was opened</param>
+ public void BytesReceived(long count) {}
+
+ /// <summary>
+ /// Called after the infrastructure has detected that failover is required but before attempting failover.
+ /// </summary>
+ ///
+ /// <param>redirect true if the broker requested redirect. false if failover is occurring due to a connection error.</param>
+ ///
+ /// <return>true to continue failing over, false to veto failover and raise a connection exception</return>
+ public bool PreFailover(bool redirect)
+ {
+ _log.Debug("public bool PreFailover(bool redirect): called");
+ return true;
+ }
+
+ /// <summary>
+ /// Called after connection has been made to another broker after failover has been started but before
+ /// any resubscription has been done.
+ /// </summary>
+ ///
+ /// <return> true to continue with resubscription, false to prevent automatic resubscription. This is useful in
+ /// cases where the application wants to handle resubscription. Note that in the latter case all sessions, producers
+ /// and consumers are invalidated.
+ /// </return>
+ public bool PreResubscribe()
+ {
+ _log.Debug("public bool PreResubscribe(): called");
+ return true;
+ }
+
+ /// <summary>
+ /// Called once failover has completed successfully. This is called irrespective of whether the client has
+ /// vetoed automatic resubscription.
+ /// </summary>
+ public void FailoverComplete()
+ {
+ _log.Debug("public void FailoverComplete(): called");
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/interop/InteropClientTestCase.cs b/dotnet/Qpid.Integration.Tests/interop/InteropClientTestCase.cs new file mode 100644 index 0000000000..7bc0fcf21a --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interop/InteropClientTestCase.cs @@ -0,0 +1,67 @@ +using System;
+using System.Text;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interop
+{
+ /// <summary> Defines the possible test case roles that an interop test case can take on. </summary>
+ public enum Roles { SENDER, RECEIVER };
+
+ /// <summary>
+ /// InteropClientTestCase provides an interface that classes implementing test cases from the interop testing spec
+ /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification) should implement.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities
+ /// <tr><td> Supply the name of the test case that this implements.
+ /// <tr><td> Accept/Reject invites based on test parameters.
+ /// <tr><td> Adapt to assigned roles.
+ /// <tr><td> Perform test case actions.
+ /// <tr><td> Generate test reports.
+ /// </table>
+ /// </summary>
+ interface InteropClientTestCase
+ {
+ /// <summary>
+ /// Should provide the name of the test case that this class implements. The exact names are defined in the
+ /// interop testing spec.
+ /// </summary>
+ ///
+ /// <returns> The name of the test case that this implements. </returns>
+ string GetName();
+
+ /// <summary>
+ /// Determines whether the test invite that matched this test case is acceptable.
+ /// </summary>
+ ///
+ /// <param name="inviteMessage"> The invitation to accept or reject. </param>
+ ///
+ /// <returns> <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. </returns>
+ ///
+ /// @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
+ bool AcceptInvite(IMessage inviteMessage);
+
+ /// <summary>
+ /// Assigns the role to be played by this test case. The test parameters are fully specified in the
+ /// assignment message. When this method return the test case will be ready to execute.
+ /// </summary>
+ ///
+ /// <param name="role"> The role to be played; sender or receiver. </param>
+ /// <param name="assignRoleMessage"> The role assingment message, contains the full test parameters. </param>
+ void AssignRole(Roles role, IMessage assignRoleMessage);
+
+ /// <summary>
+ /// Performs the test case actions.
+ /// </summary>
+ void Start();
+
+ /// <summary>
+ /// Gets a report on the actions performed by the test case in its assigned role.
+ /// </summary>
+ ///
+ /// <param name="session"> The session to create the report message in. </param>
+ ///
+ /// <returns> The report message. </returns>
+ IMessage GetReport(IChannel channel);
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase1DummyRun.cs b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase1DummyRun.cs new file mode 100644 index 0000000000..c3c159ca6c --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase1DummyRun.cs @@ -0,0 +1,69 @@ +using System;
+using System.Text;
+using log4net;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interop.TestCases
+{
+ /// <summary>
+ /// Implements tet case 1, dummy run. This test case sends no test messages, it exists to confirm that the test harness
+ /// is interacting with the coordinator correctly.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Supply the name of the test case that this implements.
+ /// <tr><td> Accept/Reject invites based on test parameters.
+ /// <tr><td> Adapt to assigned roles.
+ /// <tr><td> Perform test case actions.
+ /// <tr><td> Generate test reports.
+ /// </table>
+ /// </summary>
+ public class TestCase1DummyRun : InteropClientTestCase
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TestCase1DummyRun));
+
+ public String GetName()
+ {
+ log.Debug("public String getName(): called");
+
+ return "TC1_DummyRun";
+ }
+
+ public bool AcceptInvite(IMessage inviteMessage)
+ {
+ log.Debug("public boolean acceptInvite(Message inviteMessage): called");
+
+ // Test parameters don't matter, accept all invites.
+ return true;
+ }
+
+ public void AssignRole(Roles role, IMessage assignRoleMessage)
+ {
+ log.Debug("public void assignRole(Roles role, Message assignRoleMessage): called");
+
+ // Do nothing, both roles are the same.
+ }
+
+ public void Start()
+ {
+ log.Debug("public void start(): called");
+
+ // Do nothing.
+ }
+
+ public IMessage GetReport(IChannel channel)
+ {
+ log.Debug("public Message getReport(Session session): called");
+
+ // Generate a dummy report, the coordinator expects a report but doesn't care what it is.
+ return channel.CreateTextMessage("Dummy Run, Ok.");
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void onMessage(Message message = " + message + "): called");
+
+ // Ignore any messages.
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase2BasicP2P.cs b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase2BasicP2P.cs new file mode 100644 index 0000000000..f4f0c7dbd3 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase2BasicP2P.cs @@ -0,0 +1,185 @@ +using System;
+using System.Text;
+using log4net;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interop.TestCases
+{
+ /// <summary>
+ /// Implements test case 2, basic P2P. Sends/receives a specified number of messages to a specified route on the
+ /// default direct exchange. Produces reports on the actual number of messages sent/received.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Supply the name of the test case that this implements.
+ /// <tr><td> Accept/Reject invites based on test parameters.
+ /// <tr><td> Adapt to assigned roles.
+ /// <tr><td> Send required number of test messages.
+ /// <tr><td> Generate test reports.
+ /// </table>
+ /// </summary>
+ public class TestCase2BasicP2P : InteropClientTestCase
+ {
+ /// <summary> Used for debugging. </summary>
+ private static ILog log = LogManager.GetLogger(typeof(TestCase2BasicP2P));
+
+ /// <summary> Holds the count of test messages received. </summary>
+ private int messageCount;
+
+ /// <summary> The role to be played by the test. </summary>
+ private Roles role;
+
+ /// <summary> The number of test messages to send. </summary>
+ private int numMessages;
+
+ /// <summary> The routing key to send them to on the default direct exchange. </summary>
+ private string sendDestination;
+
+ /// <summary> The connection to send the test messages on. </summary>
+ private IConnection connection;
+
+ /// <summary> The session to send the test messages on. </summary>
+ private IChannel channel;
+
+ /// <summary> The producer to send the test messages with. </summary>
+ private IMessagePublisher publisher;
+
+ /// <summary>
+ /// Should provide the name of the test case that this class implements. The exact names are defined in the
+ /// interop testing spec.
+ /// </summary>
+ ///
+ /// <returns> The name of the test case that this implements. </returns>
+ public String GetName()
+ {
+ log.Debug("public String GetName(): called");
+
+ return "TC2_BasicP2P";
+ }
+
+ /// <summary>
+ /// Determines whether the test invite that matched this test case is acceptable.
+ /// </summary>
+ ///
+ /// <param name="inviteMessage"> The invitation to accept or reject. </param>
+ ///
+ /// <returns> <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. </returns>
+ public bool AcceptInvite(IMessage inviteMessage)
+ {
+ log.Debug("public boolean AcceptInvite(Message inviteMessage = " + inviteMessage + "): called");
+
+ // All invites are acceptable.
+ return true;
+ }
+
+ /// <summary>
+ /// Assigns the role to be played by this test case. The test parameters are fully specified in the
+ /// assignment message. When this method return the test case will be ready to execute.
+ /// </summary>
+ ///
+ /// <param name="role"> The role to be played; sender or receiver. </param>
+ /// <param name="assignRoleMessage"> The role assingment message, contains the full test parameters. </param>
+ public void AssignRole(Roles role, IMessage assignRoleMessage)
+ {
+ log.Debug("public void AssignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Reset the message count for a new test.
+ messageCount = 0;
+
+ // Take note of the role to be played.
+ this.role = role;
+
+ // Create a new connection to pass the test messages on.
+ connection =
+ TestClient.CreateConnection(TestClient.brokerUrl, TestClient.virtualHost);
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+
+ // Extract and retain the test parameters.
+ numMessages = assignRoleMessage.Headers.GetInt("P2P_NUM_MESSAGES");
+ string queueAndKeyName = assignRoleMessage.Headers.GetString("P2P_QUEUE_AND_KEY_NAME");
+ channel.DeclareQueue(queueAndKeyName, false, true, true);
+ channel.Bind(queueAndKeyName, ExchangeNameDefaults.DIRECT, queueAndKeyName);
+ sendDestination = queueAndKeyName;
+
+ log.Debug("numMessages = " + numMessages);
+ log.Debug("sendDestination = " + sendDestination);
+ log.Debug("role = " + role);
+
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a message producer if so.
+ case Roles.SENDER:
+ publisher = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.DIRECT)
+ .WithRoutingKey(sendDestination)
+ .Create();
+ break;
+
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages.
+ case Roles.RECEIVER:
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(sendDestination).Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ break;
+ }
+
+ connection.Start();
+ }
+
+ /// <summary> Performs the test case actions. </summary>
+ public void Start()
+ {
+ log.Debug("public void start(): called");
+
+ // Check that the sender role is being performed.
+ if (role == Roles.SENDER)
+ {
+ IMessage testMessage = channel.CreateTextMessage("test");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.Send(testMessage);
+
+ // Increment the message count.
+ messageCount++;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets a report on the actions performed by the test case in its assigned role.
+ /// </summary>
+ ///
+ /// <param name="session"> The session to create the report message in. </param>
+ ///
+ /// <returns> The report message. </returns>
+ public IMessage GetReport(IChannel channel)
+ {
+ log.Debug("public Message GetReport(IChannel channel): called");
+
+ // Close the test connection.
+ //connection.Stop();
+
+ // Generate a report message containing the count of the number of messages passed.
+ IMessage report = channel.CreateMessage();
+ //report.Headers.SetString("CONTROL_TYPE", "REPORT");
+ report.Headers.SetInt("MESSAGE_COUNT", messageCount);
+
+ return report;
+ }
+
+ /// <summary>
+ /// Counts incoming test messages.
+ /// </summary>
+ ///
+ /// <param name="message"> The incoming test message. </param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void OnMessage(IMessage message = " + message + "): called");
+
+ // Increment the message count.
+ messageCount++;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase3BasicPubSub.cs b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase3BasicPubSub.cs new file mode 100644 index 0000000000..a9f073ce04 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interop/TestCases/TestCase3BasicPubSub.cs @@ -0,0 +1,224 @@ +using System;
+using System.Text;
+using log4net;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.interop.TestCases
+{
+ /// <summary>
+ /// Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
+ /// default topic exchange, using the specified number of receiver connections. Produces reports on the actual number of
+ /// messages sent/received.
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Supply the name of the test case that this implements.
+ /// <tr><td> Accept/Reject invites based on test parameters.
+ /// <tr><td> Adapt to assigned roles.
+ /// <tr><td> Send required number of test messages using pub/sub.
+ /// <tr><td> Generate test reports.
+ /// </table>
+ /// </summary>
+ public class TestCase3BasicPubSub : InteropClientTestCase
+ {
+ /// <summary> Used for debugging. </summary>
+ private static ILog log = LogManager.GetLogger(typeof(TestCase3BasicPubSub));
+
+ /// <summary> Holds the count of test messages received. </summary>
+ private int messageCount;
+
+ /// <summary> The role to be played by the test. </summary>
+ private Roles role;
+
+ /// <summary> The number of test messages to send. </summary>
+ private int numMessages;
+
+ /// <summary> The number of receiver connection to use. </summary>
+ private int numReceivers;
+
+ /// <summary> The routing key to send them to on the default direct exchange. </summary>
+ private string sendDestination;
+
+ /// <summary> The connections to send/receive the test messages on. </summary>
+ private IConnection[] connection;
+
+ /// <summary> The sessions to send/receive the test messages on. </summary>
+ private IChannel[] channel;
+
+ /// <summary> The producer to send the test messages with. </summary>
+ IMessagePublisher publisher;
+
+ /// <summary>
+ /// Should provide the name of the test case that this class implements. The exact names are defined in the
+ /// interop testing spec.
+ /// </summary>
+ ///
+ /// <returns> The name of the test case that this implements. </returns>
+ public String GetName()
+ {
+ log.Debug("public String GetName(): called");
+
+ return "TC3_BasicPubSub";
+ }
+
+ /// <summary>
+ /// Determines whether the test invite that matched this test case is acceptable.
+ /// </summary>
+ ///
+ /// <param name="inviteMessage"> The invitation to accept or reject. </param>
+ ///
+ /// <returns> <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. </returns>
+ public bool AcceptInvite(IMessage inviteMessage)
+ {
+ log.Debug("public boolean AcceptInvite(IMessage inviteMessage = " + inviteMessage + "): called");
+
+ // All invites are acceptable.
+ return true;
+ }
+
+ /// <summary>
+ /// Assigns the role to be played by this test case. The test parameters are fully specified in the
+ /// assignment message. When this method return the test case will be ready to execute.
+ /// </summary>
+ ///
+ /// <param name="role"> The role to be played; sender or receiver. </param>
+ /// <param name="assignRoleMessage"> The role assingment message, contains the full test parameters. </param>
+ public void AssignRole(Roles role, IMessage assignRoleMessage)
+ {
+ log.Debug("public void assignRole(Roles role = " + role + ", IMessage assignRoleMessage = " + assignRoleMessage
+ + "): called");
+
+ // Reset the message count for a new test.
+ messageCount = 0;
+
+ // Take note of the role to be played.
+ this.role = role;
+
+ // Extract and retain the test parameters.
+ numMessages = assignRoleMessage.Headers.GetInt("PUBSUB_NUM_MESSAGES");
+ numReceivers = assignRoleMessage.Headers.GetInt("PUBSUB_NUM_RECEIVERS");
+ string sendKey = assignRoleMessage.Headers.GetString("PUBSUB_KEY");
+ sendDestination = sendKey;
+
+ log.Debug("numMessages = " + numMessages);
+ log.Debug("numReceivers = " + numReceivers);
+ log.Debug("sendKey = " + sendKey);
+ log.Debug("role = " + role);
+
+ switch (role)
+ {
+ // Check if the sender role is being assigned, and set up a single message producer if so.
+ case Roles.SENDER:
+ // Create a new connection to pass the test messages on.
+ connection = new IConnection[1];
+ channel = new IChannel[1];
+
+ connection[0] =
+ TestClient.CreateConnection(TestClient.brokerUrl, TestClient.virtualHost);
+ channel[0] = connection[0].CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+
+ // Extract and retain the test parameters.
+ publisher = channel[0].CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(sendDestination)
+ .WithMandatory(false)
+ .WithImmediate(false)
+ .Create();
+ break;
+
+ // Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
+ // of receiver connections.
+ case Roles.RECEIVER:
+ // Create the required number of receiver connections.
+ connection = new IConnection[numReceivers];
+ channel = new IChannel[numReceivers];
+
+ for (int i = 0; i < numReceivers; i++)
+ {
+ connection[i] =
+ TestClient.CreateConnection(TestClient.brokerUrl, TestClient.virtualHost);
+ channel[i] = connection[i].CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+
+ IMessageConsumer consumer = channel[i].CreateConsumerBuilder(sendDestination).Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+ }
+
+ break;
+ }
+
+ // Start all the connection dispatcher threads running.
+ foreach (IConnection con in connection)
+ {
+ con.Start();
+ }
+ }
+
+ /// <summary>
+ /// Performs the test case actions.
+ /// </summary>
+ public void Start()
+ {
+ log.Debug("public void Start(): called");
+
+ // Check that the sender role is being performed.
+ if (role == Roles.SENDER)
+ {
+ IMessage testMessage = channel[0].CreateTextMessage("test");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.Send(testMessage);
+
+ // Increment the message count.
+ messageCount++;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets a report on the actions performed by the test case in its assigned role.
+ /// </summary>
+ ///
+ /// <param name="session"> The session to create the report message in. </param>
+ ///
+ /// <returns> The report message. </returns>
+ public IMessage GetReport(IChannel channel)
+ {
+ log.Debug("public IMessage getReport(IChannel channel): called");
+
+ // Close the test connections.
+ /*foreach (IConnection con in connection)
+ {
+ try
+ {
+ con.Stop();
+ }
+ catch (AMQConnectionClosedException e)
+ {
+ // The connection has already died due to an error. Log this as a warning.
+ log.Warn("Connection already closed.");
+ }
+ }*/
+
+ // Generate a report message containing the count of the number of messages passed.
+ IMessage report = channel.CreateMessage();
+ //report.Headers.SetString("CONTROL_TYPE", "REPORT");
+ report.Headers.SetInt("MESSAGE_COUNT", messageCount);
+
+ return report;
+ }
+
+ /// <summary>
+ /// Counts incoming test messages.
+ /// </summary>
+ ///
+ /// <param name="message"> The incoming test message. </param>
+ public void OnMessage(IMessage message)
+ {
+ log.Debug("public void onMessage(IMessage message = " + message + "): called");
+
+ // Increment the message count.
+ messageCount++;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/interop/TestClient.cs b/dotnet/Qpid.Integration.Tests/interop/TestClient.cs new file mode 100644 index 0000000000..a0fdc1c59f --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/interop/TestClient.cs @@ -0,0 +1,359 @@ +using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using log4net;
+using Apache.Qpid.Integration.Tests.interop.TestCases;
+
+namespace Apache.Qpid.Integration.Tests.interop
+{
+ /// <summary>
+ /// Implements a test client as described in the interop testing spec
+ /// (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
+ /// reacts to control message sequences send by the test coordinator.
+ ///
+ /// <p/><table><caption>Messages Handled by TestClient</caption>
+ /// <tr><th> Message <th> Action
+ /// <tr><td> Invite(compulsory) <td> Reply with Enlist.
+ /// <tr><td> Invite(test case) <td> Reply with Enlist if test case available.
+ /// <tr><td> AssignRole(test case) <td> Reply with Accept Role if matches an enlisted test. Keep test parameters.
+ /// <tr><td> Start <td> Send test messages defined by test parameters. Send report on messages sent.
+ /// <tr><td> Status Request <td> Send report on messages received.
+ /// </table>
+ ///
+ /// <p><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Handle all incoming control messages. <td> {@link InteropClientTestCase}
+ /// <tr><td> Configure and look up test cases by name. <td> {@link InteropClientTestCase}
+ /// </table>
+ /// </summary>
+ public class TestClient
+ {
+ private static ILog log = LogManager.GetLogger(typeof(TestClient));
+
+ /// <summary> Defines the default broker for the tests, localhost, default port. </summary>
+ public static string DEFAULT_BROKER_URL = "amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Defines the default virtual host to use for the tests, none. </summary>
+ public static string DEFAULT_VIRTUAL_HOST = "";
+
+ /// <summary> Defines the default identifying name of this test client. </summary>
+ public static string DEFAULT_CLIENT_NAME = "dotnet";
+
+ /// <summary> Holds the URL of the broker to run the tests on. </summary>
+ public static string brokerUrl;
+
+ /// <summary> Holds the virtual host to run the tests on. If <tt>null</tt>, then the default virtual host is used. </summary>
+ public static string virtualHost;
+
+ /// <summary> The clients identifying name to print in test results and to distinguish from other clients. </summary>
+ private string clientName;
+
+ /// <summary> Holds all the test cases. </summary>
+ private IDictionary testCases = new Hashtable();
+
+ InteropClientTestCase currentTestCase;
+
+ private MessagePublisherBuilder publisherBuilder;
+
+ private IChannel channel;
+
+ /// <summary> Monitor to wait for termination events on. </summary>
+ private static object terminationMonitor = new Object();
+
+ /// <summary>
+ /// Creates a new interop test client, listenting to the specified broker and virtual host, with the specified
+ /// client identifying name.
+ /// </summary>
+ ///
+ /// <param name="brokerUrl"> The url of the broker to connect to. </param>
+ /// <param name="virtualHost"> The virtual host to conect to. </param>
+ /// <param name="clientName"> The client name to use. </param>
+ public TestClient(string brokerUrl, string virtualHost, string clientName)
+ {
+ log.Info("public TestClient(string brokerUrl = " + brokerUrl + ", string virtualHost = " + virtualHost
+ + ", string clientName = " + clientName + "): called");
+
+ // Retain the connection parameters.
+ TestClient.brokerUrl = brokerUrl;
+ TestClient.virtualHost = virtualHost;
+ this.clientName = clientName;
+ }
+
+
+ /// <summary>
+ /// The entry point for the interop test coordinator. This client accepts the following command line arguments:
+ /// </summary>
+ ///
+ /// <p/><table>
+ /// <tr><td> -b <td> The broker URL. <td> Optional.
+ /// <tr><td> -h <td> The virtual host. <td> Optional.
+ /// <tr><td> -n <td> The test client name. <td> Optional.
+ /// <tr><td> name=value <td> Trailing argument define name/value pairs. Added to system properties. <td> Optional.
+ /// </table>
+ ///
+ /// <param name="args"> The command line arguments. </param>
+ public static void Main(string[] args)
+ {
+ // Extract the command line options (Not exactly Posix but it will do for now...).
+ string brokerUrl = DEFAULT_BROKER_URL;
+ string virtualHost = DEFAULT_VIRTUAL_HOST;
+ string clientName = DEFAULT_CLIENT_NAME;
+
+ foreach (string nextArg in args)
+ {
+ if (nextArg.StartsWith("-b"))
+ {
+ brokerUrl = nextArg.Substring(2);
+ }
+ else if (nextArg.StartsWith("-h"))
+ {
+ virtualHost = nextArg.Substring(2);
+ }
+ else if (nextArg.StartsWith("-n"))
+ {
+ clientName = nextArg.Substring(2);
+ }
+ }
+
+ NDC.Push(clientName);
+
+ // Create a test client and start it running.
+ TestClient client = new TestClient(brokerUrl, virtualHost, clientName);
+
+ try
+ {
+ client.Start();
+ }
+ catch (Exception e)
+ {
+ log.Error("The test client was unable to start.", e);
+ System.Environment.Exit(1);
+ }
+
+ // Wait for a signal on the termination monitor before quitting.
+ lock (terminationMonitor)
+ {
+ Monitor.Wait(terminationMonitor);
+ }
+
+ NDC.Pop();
+ }
+
+ /// <summary>
+ /// Starts the interop test client running. This causes it to start listening for incoming test invites.
+ /// </summary>
+ private void Start()
+ {
+ log.Info("private void Start(): called");
+
+ // Use a class path scanner to find all the interop test case implementations.
+ ArrayList testCaseClasses = new ArrayList();
+
+ // ClasspathScanner.getMatches(InteropClientTestCase.class, "^TestCase.*", true);
+ // Hard code the test classes till the classpath scanner is fixed.
+ testCaseClasses.Add(typeof(TestCase1DummyRun));
+ testCaseClasses.Add(typeof(TestCase2BasicP2P));
+ testCaseClasses.Add(typeof(TestCase3BasicPubSub));
+
+ // Create all the test case implementations and index them by the test names.
+ foreach (Type testClass in testCaseClasses)
+ {
+ InteropClientTestCase testCase = (InteropClientTestCase)Activator.CreateInstance(testClass);
+ testCases.Add(testCase.GetName(), testCase);
+
+ log.Info("Found test case: " + testClass);
+ }
+
+ // Open a connection to communicate with the coordinator on.
+ log.Info("brokerUrl = " + brokerUrl);
+ IConnection connection = CreateConnection(brokerUrl, virtualHost);
+
+ channel = connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge);
+
+ // Set this up to listen for control messages.
+ string responseQueueName = channel.GenerateUniqueName();
+ channel.DeclareQueue(responseQueueName, false, true, true);
+
+ channel.Bind(responseQueueName, ExchangeNameDefaults.TOPIC, "iop.control." + clientName);
+ channel.Bind(responseQueueName, ExchangeNameDefaults.TOPIC, "iop.control");
+
+ IMessageConsumer consumer = channel.CreateConsumerBuilder(responseQueueName)
+ .Create();
+ consumer.OnMessage += new MessageReceivedDelegate(OnMessage);
+
+ // Create a publisher to send replies with.
+ publisherBuilder = channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.DIRECT);
+
+
+ // Start listening for incoming control messages.
+ connection.Start();
+ Console.WriteLine("Test client " + clientName + " ready to receive test control messages...");
+ }
+
+ /// <summary>
+ /// Establishes an AMQ connection. This is a simple convenience method for code that does not anticipate handling connection failures.
+ /// All exceptions that indicate that the connection has failed, are allowed to fall through.
+ /// </summary>
+ ///
+ /// <param name="brokerUrl"> The broker url to connect to, <tt>null</tt> to use the default from the properties. </param>
+ /// <param name="virtualHost"> The virtual host to connectio to, <tt>null</tt> to use the default. </param>
+ ///
+ /// <returns> A JMS conneciton. </returns>
+ public static IConnection CreateConnection(string brokerUrl, string virtualHost)
+ {
+ log.Info("public static Connection createConnection(string brokerUrl = " + brokerUrl + ", string virtualHost = "
+ + virtualHost + "): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(brokerUrl);
+ connectionInfo.VirtualHost = virtualHost;
+ IConnection connection = new AMQConnection(connectionInfo);
+
+ return connection;
+ }
+
+ /// <summary>
+ /// Handles all incoming control messages.
+ /// </summary>
+ ///
+ /// <param name="message"> The incoming message. </param>
+ public void OnMessage(IMessage message)
+ {
+ log.Info("public void OnMessage(IMessage message = " + message + "): called");
+
+ try
+ {
+ string controlType = message.Headers.GetString("CONTROL_TYPE");
+ string testName = message.Headers.GetString("TEST_NAME");
+
+ // Check if the message is a test invite.
+ if ("INVITE" == controlType)
+ {
+ string testCaseName = message.Headers.GetString("TEST_NAME");
+
+ // Flag used to indicate that an enlist should be sent. Only enlist to compulsory invites or invites
+ // for which test cases exist.
+ bool enlist = false;
+
+ if (testCaseName != null)
+ {
+ log.Info("Got an invite to test: " + testCaseName);
+
+ // Check if the requested test case is available.
+ InteropClientTestCase testCase = (InteropClientTestCase)testCases[testCaseName];
+
+ if (testCase != null)
+ {
+ // Make the requested test case the current test case.
+ currentTestCase = testCase;
+ enlist = true;
+ }
+ }
+ else
+ {
+ log.Info("Got a compulsory invite.");
+
+ enlist = true;
+ }
+
+ log.Info("enlist = " + enlist);
+
+ if (enlist)
+ {
+ // Reply with the client name in an Enlist message.
+ IMessage enlistMessage = channel.CreateMessage();
+ enlistMessage.Headers.SetString("CONTROL_TYPE", "ENLIST");
+ enlistMessage.Headers.SetString("CLIENT_NAME", clientName);
+ enlistMessage.Headers.SetString("CLIENT_PRIVATE_CONTROL_KEY", "iop.control." + clientName);
+ enlistMessage.CorrelationId = message.CorrelationId;
+
+ Send(enlistMessage, message.ReplyToRoutingKey);
+ }
+ }
+ else if ("ASSIGN_ROLE" == controlType)
+ {
+ // Assign the role to the current test case.
+ string roleName = message.Headers.GetString("ROLE");
+
+ log.Info("Got a role assignment to role: " + roleName);
+
+ Roles role;
+
+ if (roleName == "SENDER")
+ {
+ role = Roles.SENDER;
+ }
+ else
+ {
+ role = Roles.RECEIVER;
+ }
+
+ currentTestCase.AssignRole(role, message);
+
+ // Reply by accepting the role in an Accept Role message.
+ IMessage acceptRoleMessage = channel.CreateMessage();
+ acceptRoleMessage.Headers.SetString("CONTROL_TYPE", "ACCEPT_ROLE");
+ acceptRoleMessage.CorrelationId = message.CorrelationId;
+
+ Send(acceptRoleMessage, message.ReplyToRoutingKey);
+ }
+ else if ("START" == controlType || "STATUS_REQUEST" == controlType)
+ {
+ if ("START" == controlType)
+ {
+ log.Info("Got a start notification.");
+
+ // Start the current test case.
+ currentTestCase.Start();
+ }
+ else
+ {
+ log.Info("Got a status request.");
+ }
+
+ // Generate the report from the test case and reply with it as a Report message.
+ IMessage reportMessage = currentTestCase.GetReport(channel);
+ reportMessage.Headers.SetString("CONTROL_TYPE", "REPORT");
+ reportMessage.CorrelationId = message.CorrelationId;
+
+ Send(reportMessage, message.ReplyToRoutingKey);
+ }
+ else if ("TERMINATE" == controlType)
+ {
+ Console.WriteLine("Received termination instruction from coordinator.");
+
+ // Is a cleaner shutdown needed?
+ System.Environment.Exit(1);
+ }
+ else
+ {
+ // Log a warning about this but otherwise ignore it.
+ log.Warn("Got an unknown control message, controlType = " + controlType + ", message = " + message);
+ }
+ }
+ catch (QpidException e)
+ {
+ // Log a warning about this, but otherwise ignore it.
+ log.Warn("A QpidException occurred whilst handling a message.");
+ log.Info("Got QpidException whilst handling message: " + message, e);
+ }
+ }
+
+ /// <summary>
+ /// Send the specified message using the specified routing key on the direct exchange.
+ /// </summary>
+ ///
+ /// <param name="message"> The message to send.</param>
+ /// <param name="routingKey"> The routing key to send the message with.</param>
+ public void Send(IMessage message, string routingKey)
+ {
+ IMessagePublisher publisher = publisherBuilder.WithRoutingKey(routingKey).Create();
+ publisher.Send(message);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/old/ServiceProvidingClient.tmp b/dotnet/Qpid.Integration.Tests/old/ServiceProvidingClient.tmp new file mode 100644 index 0000000000..b1e7a50aaa --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/old/ServiceProvidingClient.tmp @@ -0,0 +1,150 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class ServiceProvidingClient : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(ServiceProvidingClient));
+
+ private int _messageCount;
+
+ private string _replyToExchangeName;
+ private string _replyToRoutingKey;
+ const int PACK = 100;
+
+ private IMessagePublisher _destinationPublisher;
+ private IMessageConsumer _consumer;
+
+ private string _serviceName = "ServiceQ1";
+
+ private string _selector = null;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+
+ _logger.Info("Starting...");
+ _logger.Info("Service (queue) name is '" + _serviceName + "'...");
+
+ _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException);
+
+ _logger.Info("Message selector is <" + _selector + ">...");
+
+ _channel.DeclareQueue(_serviceName, false, false, false);
+
+ _consumer = _channel.CreateConsumerBuilder(_serviceName)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(500)
+ .WithNoLocal(true)
+ .Create();
+ _consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
+
+ public override void Shutdown()
+ {
+ _consumer.Dispose();
+ base.Shutdown();
+ }
+
+ private void OnConnectionException(Exception e)
+ {
+ _logger.Info("Connection exception occurred", e);
+ // XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat?
+ }
+
+ [Test]
+ public void Test()
+ {
+ _connection.Start();
+ _logger.Info("Waiting...");
+
+ ServiceRequestingClient client = new ServiceRequestingClient();
+ client.Init();
+ client.SendMessages();
+ }
+
+ private void OnMessage(IMessage message)
+ {
+// _logger.Info("Got message '" + message + "'");
+
+ ITextMessage tm = (ITextMessage)message;
+
+ try
+ {
+ string replyToExchangeName = tm.ReplyToExchangeName;
+ string replyToRoutingKey = tm.ReplyToRoutingKey;
+
+ _replyToExchangeName = replyToExchangeName;
+ _replyToRoutingKey = replyToRoutingKey;
+ _logger.Debug("About to create a producer");
+
+// Console.WriteLine("ReplyTo.ExchangeName = " + _replyToExchangeName);
+// Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey);
+
+ _destinationPublisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(_replyToExchangeName)
+ .WithRoutingKey(_replyToRoutingKey)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
+ .Create();
+ _destinationPublisher.DisableMessageTimestamp = true;
+ _logger.Debug("After create a producer");
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error creating destination", e);
+ throw e;
+ }
+ _messageCount++;
+ if (_messageCount % PACK == 0)
+ {
+ _logger.Info("Received message total: " + _messageCount);
+ _logger.Info(string.Format("Sending response to '{0}:{1}'",
+ _replyToExchangeName, _replyToRoutingKey));
+ }
+
+ try
+ {
+ String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text;
+ ITextMessage msg = _channel.CreateTextMessage(payload);
+ if ( tm.Headers.Contains("timeSent") )
+ {
+ msg.Headers["timeSent"] = tm.Headers["timeSent"];
+ }
+ _destinationPublisher.Send(msg);
+ } catch ( QpidException e )
+ {
+ _logger.Error("Error sending message: " + e, e);
+ throw e;
+ } finally
+ {
+ _destinationPublisher.Dispose();
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp b/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp new file mode 100644 index 0000000000..da0f764bcd --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/old/ServiceRequestingClient.tmp @@ -0,0 +1,182 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ public class ServiceRequestingClient : BaseMessagingTestFixture
+ {
+ private const int MESSAGE_SIZE = 1024;
+ private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE);
+
+ private const int PACK = 100;
+ private const int NUM_MESSAGES = PACK*10; // increase when in standalone
+
+ private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient));
+
+ ManualResetEvent _finishedEvent = new ManualResetEvent(false);
+
+ private int _expectedMessageCount = NUM_MESSAGES;
+
+ private long _startTime = 0;
+
+ private string _commandQueueName = "ServiceQ1";
+
+ private IMessagePublisher _publisher;
+
+ Avergager averager = new Avergager();
+
+ private void InitialiseProducer()
+ {
+ try
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithDeliveryMode(DeliveryMode.NonPersistent)
+ .Create();
+ _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder?
+ }
+ catch (QpidException e)
+ {
+ _log.Error("Error: " + e, e);
+ }
+ }
+
+ [Test]
+ public void SendMessages()
+ {
+ InitialiseProducer();
+
+ string replyQueueName = _channel.GenerateUniqueName();
+
+ _channel.DeclareQueue(replyQueueName, false, true, true);
+
+ IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(200)
+ .WithNoLocal(true)
+ .WithExclusive(true).Create();
+
+ _startTime = DateTime.Now.Ticks;
+
+ messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+ for (int i = 0; i < _expectedMessageCount; i++)
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(MESSAGE_DATA + i);
+ }
+ catch (Exception e)
+ {
+ _log.Error("Error creating message: " + e, e);
+ break;
+ }
+ msg.ReplyToRoutingKey = replyQueueName;
+
+ // Added timestamp.
+ long timeNow = DateTime.Now.Ticks;
+ string timeSentString = String.Format("{0:G}", timeNow);
+ msg.Headers.SetLong("timeSent", timeNow);
+
+ _publisher.Send(msg);
+ }
+
+ // Assert that the test finishes within a reasonable amount of time.
+ const int waitSeconds = 40;
+ const int waitMilliseconds = waitSeconds * 1000;
+ _log.Info("Finished sending " + _expectedMessageCount + " messages");
+ _log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds));
+ Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false),
+ String.Format("Expected to finish in {0} seconds", waitSeconds));
+ }
+
+ public void OnMessage(IMessage m)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Message received: " + m);
+ }
+
+ if (!m.Headers.Contains("timeSent"))
+ {
+ throw new Exception("Set timeSent!");
+ }
+
+ long sentAt = m.Headers.GetLong("timeSent");
+ long now = DateTime.Now.Ticks;
+ long latencyTicks = now - sentAt;
+ long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond;
+
+ averager.Add(latencyMilliseconds);
+
+ if (averager.Num % PACK == 0)
+ {
+ _log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond);
+ _log.Info(String.Format("Average latency (ms) = {0}", averager));
+ _log.Info("Received message count: " + averager.Num);
+ }
+
+ if (averager.Num == _expectedMessageCount)
+ {
+ _log.Info(String.Format("Final average latency (ms) = {0}", averager));
+
+ double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000);
+ _log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " +
+ timeTakenSeconds + "s, equivalent to " +
+ (_expectedMessageCount/timeTakenSeconds) + " messages per second");
+
+ _finishedEvent.Set(); // Notify main thread to quit.
+ }
+ }
+ }
+
+ class Avergager
+ {
+ long num = 0;
+ long sum = 0;
+
+ long min = long.MaxValue;
+ long max = long.MinValue;
+
+ public void Add(long item)
+ {
+ ++num;
+ sum += item;
+ if (item < min) min = item;
+ if (item > max) max = item;
+ }
+
+ public long Average { get { return sum/num; }}
+
+ public long Num { get { return num; } }
+
+ public override string ToString()
+ {
+ return String.Format("average={0} min={1} max={2}", Average, min, max);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs new file mode 100644 index 0000000000..c208a1eb57 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/BaseMessagingTestFixture.cs @@ -0,0 +1,86 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Provides a basis for writing Unit tests that communicate with an AMQ protocol broker. By default it creates a connection
+ /// to a message broker running on localhost on the standard AMQ port, 5672, using guest:guest login credentials, on the default exchange,
+ /// 'test' queue.
+ /// </summary>
+ public class BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(BaseMessagingTestFixture));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string connectionUri = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /// <summary> Holds the test connection. </summary>
+ protected IConnection _connection;
+
+ /// <summary> Holds the test channel. </summary>
+ protected IChannel _channel;
+
+ /// <summary>
+ /// Creates the test connection and channel.
+ /// </summary>
+ [SetUp]
+ public virtual void Init()
+ {
+ _logger.Info("public virtual void Init(): called");
+
+ try
+ {
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(connectionUri);
+ _connection = new AMQConnection(connectionInfo);
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 500, 300);
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error initialisng test fixture: " + e, e);
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// Disposes the test connection. This is called manually because the connection is a field so dispose will not be automatically
+ /// called on it.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
+ {
+ _logger.Info("public virtual void Shutdown(): called");
+
+ if (_connection != null)
+ {
+ _logger.Info("Disposing connection.");
+ _connection.Dispose();
+ _logger.Info("Connection disposed.");
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs new file mode 100644 index 0000000000..117dc200d3 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/ChannelQueueTest.cs @@ -0,0 +1,237 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using System.Threading;
+using log4net;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+using NUnit.Framework;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Test the queue methods
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class ChannelQueueTest
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+ const string _routingKey = "ServiceQ1";
+
+ private ExceptionListenerDelegate _exceptionDelegate;
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+ private Exception _lastException = null;
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+ private IChannel _channel;
+ private IConnection _connection;
+
+ private string _queueName;
+
+ [SetUp]
+ public virtual void Init()
+ {
+ _logger.Info("public virtual void Init(): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
+ _connection = new AMQConnection(connectionInfo);
+ _logger.Info("Starting...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Establish a session on the broker.
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Create a durable, non-temporary, non-exclusive queue.
+ _queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(_queueName, true, false, false);
+
+ _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ }
+
+ [TearDown]
+ public virtual void ShutDown()
+ {
+ _logger.Info("public virtual void Shutdown(): called");
+
+ if (_connection != null)
+ {
+ _logger.Info("Disposing connection.");
+ _connection.Dispose();
+ _logger.Info("Connection disposed.");
+ }
+ }
+
+ [Test]
+ public void DeleteUsedQueue()
+ {
+ // Create the consumer
+ _consumer = _channel.CreateConsumerBuilder(_queueName)
+ .WithPrefetchLow(100)
+ .Create();
+ _logger.Info("Consumer was created...");
+
+ // delete the queue
+ _channel.DeleteQueue(_queueName, false, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteUnusedQueue()
+ {
+ // delete the queue
+ _channel.DeleteQueue(_queueName, true, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteNonEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+ SendTestMessage("Message 1");
+
+ try
+ {
+ _channel.DeleteQueue(_queueName, true, false, true);
+ }
+ catch (AMQException)
+ {
+ Assert.Fail("The test fails");
+ }
+ }
+
+ [Test]
+ public void DeleteEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ // delete an empty queue with ifEmpty = true
+ _channel.DeleteQueue(_queueName, false, true, true);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteQueueWithResponse()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ // delete the queue, the server must respond
+ _channel.DeleteQueue(_queueName, false, false, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithOutResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, true);
+ }
+
+
+ /// <summary>
+ /// Callback method to handle any exceptions raised by the test connection.</summary> ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ private void SendTestMessage(string msg)
+ {
+ // create the IMessage object
+ IMessage msgSend = _channel.CreateTextMessage(msg);
+
+ // send the message
+ _publisher.Send(msgSend);
+ _logger.InfoFormat("The messages \"{0}\" was sent", msg);
+ }
+
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs new file mode 100644 index 0000000000..357f164346 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/ConnectionTest.cs @@ -0,0 +1,73 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class ConnectionTest
+ {
+ private AmqBrokerInfo _broker =
+ new AmqBrokerInfo("amqp", "localhost", 5672, false);
+
+ [Test]
+ public void SimpleConnection()
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.AddBrokerInfo(_broker);
+ using (IConnection connection = new AMQConnection(connectionInfo))
+ {
+ Console.WriteLine("connection = " + connection);
+ }
+ }
+
+ [Test]
+ [ExpectedException(typeof(AMQAuthenticationException))]
+ public void PasswordFailureConnection()
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.Password = "rubbish";
+ connectionInfo.AddBrokerInfo(_broker);
+
+ using (IConnection connection = new AMQConnection(connectionInfo))
+ {
+ Console.WriteLine("connection = " + connection);
+ // wrong
+ Assert.Fail("Authentication succeeded but should've failed");
+ }
+ }
+
+ [Test]
+ [ExpectedException(typeof(AMQConnectionException))]
+ public void ConnectionFailure()
+ {
+ string url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5673?retries='0''";
+ new AMQConnection(QpidConnectionInfo.FromUrl(url));
+ Assert.Fail("Connection should not be established");
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs new file mode 100644 index 0000000000..1d1dc622a2 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/HeadersExchangeTest.cs @@ -0,0 +1,268 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Framing;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Sets up a producer/consumer pair to send test messages through a header exchange. The header exchange matching pattern is tested to
+ /// verify that it correctly matches or filters out messages based on their headers.
+ ///
+ /// Check that a message matching all fields of a headers exchange is passed by the exchange.
+ /// Check that a message containing values for empty fields of a headers exchange is passed by the exchange.
+ /// Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.
+ /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by the exchange.
+ /// </summary>
+ ///
+ /// <todo>Remove the HeadersMatchingProducer class and rename this to HeaderExchangeTest. The producer and consumer are implemented
+ /// in a single test class to make running this as part of an automated test suite possible.</todo>
+ ///
+ /// <todo>Consider not using a delegate to callback the OnMessage method. Easier to just call receive on the consumer but using the
+ /// callback does demonstrate how to do so.</todo>
+ [TestFixture, Category("Integration")]
+ public class HeadersExchangeTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(HeadersExchangeTest));
+
+ /// <summary> Holds the default test timeout for broker communications before tests give up. </summary>
+ private static readonly int TIMEOUT = 1000;
+
+ /// <summary> Holds the name of the headers exchange to create to send test messages on. </summary>
+ private string _exchangeName = "ServiceQ1";
+
+ /// <summary> Used to preserve the most recent exception in case test cases need to examine it. </summary>
+ private Exception _lastException = null;
+
+ /// <summary> Used to preserve the most recent message from the test consumer. </summary>
+ private IMessage _lastMessage = null;
+
+ /// <summary> The test consumer to get messages from the broker with. </summary>
+ private IMessageConsumer _consumer;
+
+ private IMessagePublisher _publisher;
+
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+
+ private MessageReceivedDelegate _msgRecDelegate;
+ private ExceptionListenerDelegate _exceptionDelegate;
+
+ [SetUp]
+ public override void Init()
+ {
+ // Ensure that the base init method is called. It establishes a connection with the broker.
+ base.Init();
+
+ _logger.Info("Starting...");
+ _logger.Info("Exchange name is '" + _exchangeName + "'...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Declare a new headers exchange with the name of the test service.
+ _channel.DeclareExchange(_exchangeName, ExchangeClassConstants.HEADERS);
+
+ // Create a non-durable, temporary (aka auto-delete), exclusive queue.
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ // Bind the queue to the new headers exchange, setting up some header patterns for the exchange to match.
+ _channel.Bind(queueName, _exchangeName, null, CreatePatternAsFieldTable());
+
+ // Create a test consumer to consume messages from the test exchange.
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(500)
+ .WithNoLocal(false) // make sure we get our own messages
+ .Create();
+
+ // Register this to listen for messages on the consumer.
+ _msgRecDelegate = new MessageReceivedDelegate(OnMessage);
+ _consumer.OnMessage += _msgRecDelegate;
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ _lastMessage = null;
+
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(_exchangeName)
+ .WithMandatory(true)
+ .Create();
+
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ // Start all channel
+ _connection.Start();
+ }
+
+ /// <summary>
+ /// Deregisters the on message delegate before closing the connection.
+ /// </summary>
+ [TearDown]
+ public override void Shutdown()
+ {
+ _logger.Info("public void Shutdown(): called");
+
+ //_consumer.OnMessage -= _msgRecDelegate;
+ //_connection.ExceptionListener -= _exceptionDelegate;
+
+ _connection.Stop();
+
+ base.Shutdown();
+ }
+
+ /// <summary>
+ /// Callback method that is passed any messages received on the test channel.
+ /// </summary>
+ ///
+ /// <param name="message">The received message.</param>
+ public void OnMessage(IMessage message)
+ {
+ _logger.Debug(string.Format("message.Type = {0}", message.GetType()));
+ _logger.Debug("Got message '" + message + "'");
+
+ // Preserve the most recent exception so that test cases can examine it.
+ _lastMessage = message;
+
+ // Notify any waiting threads that a message has been received.
+ _evt.Set();
+ }
+
+ /// <summary>Callback method to handle any exceptions raised by the test connection.</summary>
+ ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>Check that a message matching all fields of a headers exchange is passed by the exchange.</summary>
+ [Test]
+ public void TestMatchAll()
+ {
+ IMessage msg = _channel.CreateTextMessage("matches match2=''");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>Check that a message containing values for empty fields of a headers exchange is passed by the exchange.</summary>
+ [Test]
+ public void TestMatchEmptyMatchesAnything()
+ {
+ // Send a test message that matches the headers exchange.
+ IMessage msg = _channel.CreateTextMessage("matches match1='foo' and match2='bar'");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "bar";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
+ [Test]
+ public void TestMatchOneFails()
+ {
+ IMessage msg = _channel.CreateTextMessage("not match - only match1");
+ msg.Headers["match1"] = "foo";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and not received.
+ SendTestMessage(msg, false);
+ }
+
+ /// <summary>
+ /// Check that a message with additional fields to the correct matching fields of a headers exchange is passed by
+ /// the exchange.
+ /// </summary>
+ [Test]
+ public void TestMatchExtraFields()
+ {
+ IMessage msg = _channel.CreateTextMessage("matches - extra headers");
+ msg.Headers["match1"] = "foo";
+ msg.Headers["match2"] = "bar";
+ msg.Headers["match3"] = "not required";
+
+ // Use the SendTestMessage helper method to verify that the message was sent and received.
+ SendTestMessage(msg, true);
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ /// <param name="shouldPass">A flag to indicate whether or not the message should be received by the consumer.</param>
+ private void SendTestMessage(IMessage msgSend, bool shouldPass)
+ {
+ _publisher.Send(msgSend);
+ _evt.WaitOne(TIMEOUT, true);
+
+ // Check that an exception other than not routable was raised in which case re-raise it as a test error.
+ if (_lastException != null && !(_lastException.InnerException is AMQUndeliveredException))
+ {
+ Assert.Fail("Exception {0} was raised by the broker connection.", _lastException);
+ }
+ // Check that a message was returned if the test is expecting the message to pass.
+ else if (shouldPass)
+ {
+ Assert.IsNotNull(_lastMessage, "Did not get a matching message from the headers exchange.");
+ }
+ // Check that a not routable exception was raised if the test is expecting the message to fail.
+ else if (_lastException != null && _lastException.InnerException is AMQUndeliveredException)
+ {
+ Assert.IsNull(_lastMessage, "Message could not be routed so consumer should not have received it.");
+ }
+ // The broker did not respond within the test timeout so fail the test.
+ else
+ {
+ Assert.Fail("The test timed out without a response from the broker.");
+ }
+ }
+
+ /// <summary> Returns a field table containing patterns to match the test header exchange against. </summary>
+ ///
+ /// <returns> A field table containing test patterns. </returns>
+ private FieldTable CreatePatternAsFieldTable()
+ {
+ FieldTable matchTable = new FieldTable();
+
+ matchTable["match1"] = "foo";
+ matchTable["match2"] = "";
+ matchTable["x-match"] = "all";
+
+ return matchTable;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs b/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs new file mode 100644 index 0000000000..08b7890b26 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/MandatoryMessageTest.cs @@ -0,0 +1,232 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class MandatoryMessageTest
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(MandatoryMessageTest));
+
+ /// <summary>Specifies the number of times to run the test cycle.</summary>
+ const int NUM_MESSAGES = 10;
+
+ /// <summary>Determines how many messages to send within each commit.</summary>
+ const int COMMIT_BATCH_SIZE = 1;
+
+ /// <summary>Specifies the duration of the pause to place between each message sent in the test.</summary>
+ //const int SLEEP_MILLIS = 1;
+
+ /// <summary>Specified the maximum time in milliseconds to wait for the test to complete.</summary>
+ const int TIMEOUT = 10000;
+
+ /// <summary>Defines the number of test messages to send, before prompting the user to fail a broker.</summary>
+ const int FAIL_POINT = 5;
+
+ /// <summary>Specified the ack mode to use for the test.</summary>
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.AutoAcknowledge;
+
+ /// <summary>Determines whether this test runs transactionally or not. </summary>
+ bool transacted = false;
+
+ /// <summary>Holds the connection to run the test over.</summary>
+ AMQConnection _connection;
+
+ /// <summary>Holds the channel for the test message publisher. </summary>
+ IChannel publishingChannel;
+
+ /// <summary>Holds the test message publisher. </summary>
+ IMessagePublisher publisher;
+
+ /// <summary>Used to keep count of the number of messages sent. </summary>
+ int messagesSent;
+
+ /// <summary>Used to keep count of the number of messages received. </summary>
+ int messagesReceived;
+
+ /// <summary>Used to wait for test completion on. </summary>
+ private static object testComplete = new Object();
+
+ /// <summary>
+ /// </summary>
+ /// [SetUp]
+ public void Init(IConnectionInfo connectionInfo)
+ {
+ // Reset all counts.
+ messagesSent = 0;
+ messagesReceived = 0;
+
+ // Create a connection for the test.
+ _connection = new AMQConnection(connectionInfo);
+
+ // Create a consumer to receive the test messages.
+ IChannel receivingChannel = _connection.CreateChannel(false, _acknowledgeMode);
+
+ string queueName = receivingChannel.GenerateUniqueName();
+ receivingChannel.DeclareQueue(queueName, false, true, true);
+ receivingChannel.Bind(queueName, "amq.direct", queueName);
+
+ IMessageConsumer consumer = receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
+
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ _connection.Start();
+
+ // Create a publisher to send the test messages.
+ publishingChannel = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge);
+ publisher = publishingChannel.CreatePublisherBuilder()
+ .WithRoutingKey(queueName)
+ .Create();
+
+ _log.Debug("connection = " + _connection);
+ _log.Debug("connectionInfo = " + connectionInfo);
+ _log.Debug("connection.AsUrl = " + _connection.toURL());
+ _log.Debug("AcknowledgeMode is " + _acknowledgeMode);
+ }
+
+ /// <summary>
+ /// Clean up the test connection.
+ /// </summary>
+ [TearDown]
+ public virtual void Shutdown()
+ {
+ Thread.Sleep(2000);
+ _connection.Close();
+ }
+
+ /// <summary>
+ /// Runs a failover test, with the failover configuration specified in the Qpid connection URL format.
+ /// </summary>
+ [Test]
+ public void TestWithUrl()
+ {
+ _log.Debug("public void runTestWithUrl(): called");
+
+ // Parse the connection parameters from a URL.
+ String clientId = "failover" + DateTime.Now.Ticks;
+ string defaultUrl = "amqp://guest:guest@" + clientId + "/test" +
+ "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'";
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(defaultUrl);
+
+ Init(connectionInfo);
+ DoMandatoryMessageTest();
+ }
+
+ /// <summary>
+ /// Send the test messages, prompting at the fail point for the user to cause a broker failure. The test checks that all messages sent
+ /// are received within the test time limit.
+ /// </summary>
+ ///
+ /// <param name="connectionInfo">The connection parameters, specifying the brokers to fail between.</param>
+ void DoMandatoryMessageTest()
+ {
+ _log.Debug("void DoMandatoryMessageTest(IConnectionInfo connectionInfo): called");
+
+ for (int i = 1; i <= NUM_MESSAGES; ++i)
+ {
+ ITextMessage msg = publishingChannel.CreateTextMessage("message=" + messagesSent);
+ //_log.Debug("sending message = " + msg.Text);
+ publisher.Send(msg);
+ messagesSent++;
+
+ _log.Debug("messagesSent = " + messagesSent);
+
+ if (transacted)
+ {
+ publishingChannel.Commit();
+ }
+ }
+
+ // Wait for all of the test messages to be received, checking that this occurs within the test time limit.
+ bool withinTimeout;
+
+ lock(testComplete)
+ {
+ withinTimeout = Monitor.Wait(testComplete, TIMEOUT);
+ }
+
+ if (!withinTimeout)
+ {
+ Assert.Fail("Test timed out, before all messages received.");
+ }
+
+ _log.Debug("void DoMandatoryMessageTest(IConnectionInfo connectionInfo): exiting");
+ }
+
+ /// <summary>
+ /// Receives all of the test messages.
+ /// </summary>
+ ///
+ /// <param name="message">The newly arrived test message.</param>
+ public void OnMessage(IMessage message)
+ {
+ try
+ {
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ message.Acknowledge();
+ }
+
+ messagesReceived++;
+
+ _log.Debug("messagesReceived = " + messagesReceived);
+
+ // Check if all of the messages in the test have been received, in which case notify the message producer that the test has
+ // succesfully completed.
+ if (messagesReceived == NUM_MESSAGES)
+ {
+ lock (testComplete)
+ {
+ Monitor.Pulse(testComplete);
+ }
+ }
+ }
+ catch (QpidException e)
+ {
+ _log.Fatal("Exception received. About to stop.", e);
+ Stop();
+ }
+ }
+
+ // <summary>Closes the test connection.</summary>
+ private void Stop()
+ {
+ _log.Debug("Stopping...");
+ try
+ {
+ _connection.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Debug("Failed to shutdown: ", e);
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs new file mode 100644 index 0000000000..1cdacfe778 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/ProducerMultiConsumerTest.cs @@ -0,0 +1,127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class ProducerMultiConsumerTest : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumerTest));
+
+ private string _commandQueueName = "ServiceQ1";
+
+ private const int CONSUMER_COUNT = 5;
+
+ private const int MESSAGE_COUNT = 1000;
+
+ private const string MESSAGE_DATA_BYTES = "****jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ AutoResetEvent _finishedEvent = new AutoResetEvent(false);
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while (count < size + MESSAGE_DATA_BYTES.Length)
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if (count < size)
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessagePublisher _publisher;
+
+ private IMessageConsumer[] _consumers = new IMessageConsumer[CONSUMER_COUNT];
+
+ private int _messageReceivedCount = 0;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ for (int i = 0; i < CONSUMER_COUNT; i++)
+ {
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumers[i] = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
+ _connection.Start();
+ }
+
+ public void OnMessage(IMessage m)
+ {
+ int newCount = Interlocked.Increment(ref _messageReceivedCount);
+ if (newCount % 1000 == 0) _logger.Info("Received count=" + newCount);
+ if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT))
+ {
+ _logger.Info("All messages received");
+ _finishedEvent.Set();
+ }
+ if ( newCount % 100 == 0 )
+ System.Diagnostics.Debug.WriteLine(((ITextMessage)m).Text);
+ }
+
+ [Test]
+ public void RunTest()
+ {
+ for (int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8*i));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+ _finishedEvent.WaitOne();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs new file mode 100644 index 0000000000..9ccbdcbf79 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/SslConnectionTest.cs @@ -0,0 +1,64 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Reflection;
+using System.Security.Cryptography.X509Certificates;
+using NUnit.Framework;
+using Apache.Qpid.Client.Qms;
+using Apache.Qpid.Client;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Test SSL/TLS connections to the broker
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class SslConnectionTest
+ {
+ /// <summary>
+ /// Make a test TLS connection to the broker
+ /// without using client-certificates
+ /// </summary>
+ [Test]
+ public void DoSslConnection()
+ {
+ // because for tests we don't usually trust the server certificate
+ // we need here to tell the client to ignore certificate validation errors
+ SslOptions sslConfig = new SslOptions(null, true);
+
+ MakeBrokerConnection(sslConfig);
+ }
+
+ private static void MakeBrokerConnection(SslOptions options)
+ {
+ IConnectionInfo connectionInfo = new QpidConnectionInfo();
+ connectionInfo.VirtualHost = "test";
+ connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 8672, options));
+
+ using ( IConnection connection = new AMQConnection(connectionInfo) )
+ {
+ Console.WriteLine("connection = " + connection);
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs b/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs new file mode 100644 index 0000000000..f813764cb1 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/SyncConsumerTest.cs @@ -0,0 +1,127 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ [TestFixture, Category("Integration")]
+ public class SyncConsumerTest : BaseMessagingTestFixture
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(SyncConsumerTest));
+
+ private string _commandQueueName = "ServiceQ1";
+ private const int MESSAGE_COUNT = 1000;
+ private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk ";
+
+ private static String GetData(int size)
+ {
+ StringBuilder buf = new StringBuilder(size);
+ int count = 0;
+ while ( count < size + MESSAGE_DATA_BYTES.Length )
+ {
+ buf.Append(MESSAGE_DATA_BYTES);
+ count += MESSAGE_DATA_BYTES.Length;
+ }
+ if ( count < size )
+ {
+ buf.Append(MESSAGE_DATA_BYTES, 0, size - count);
+ }
+
+ return buf.ToString();
+ }
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .Create();
+
+ _publisher.DisableMessageTimestamp = true;
+ _publisher.DeliveryMode = DeliveryMode.NonPersistent;
+
+ string queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(queueName, false, true, true);
+
+ _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName);
+
+ _consumer = _channel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(100).Create();
+ _connection.Start();
+ }
+
+ [Test]
+ public void ReceiveWithInfiniteWait()
+ {
+ // send all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ ITextMessage msg;
+ try
+ {
+ msg = _channel.CreateTextMessage(GetData(512 + 8 * i));
+ } catch ( Exception e )
+ {
+ _logger.Error("Error creating message: " + e, e);
+ break;
+ }
+ _publisher.Send(msg);
+ }
+
+ _logger.Debug("All messages sent");
+ // receive all messages
+ for ( int i = 0; i < MESSAGE_COUNT; i++ )
+ {
+ try
+ {
+ IMessage msg = _consumer.Receive();
+ Assert.IsNotNull(msg);
+ } catch ( Exception e )
+ {
+ _logger.Error("Error receiving message: " + e, e);
+ Assert.Fail(e.ToString());
+ }
+ }
+ }
+
+ [Test]
+ public void ReceiveWithTimeout()
+ {
+ ITextMessage msg = _channel.CreateTextMessage(GetData(512 + 8));
+ _publisher.Send(msg);
+
+ IMessage recvMsg = _consumer.Receive();
+ Assert.IsNotNull(recvMsg);
+ // empty queue, should timeout
+ Assert.IsNull(_consumer.Receive(1000));
+ }
+ }
+}
diff --git a/dotnet/Qpid.Integration.Tests/testcases/UndeliverableTest.cs b/dotnet/Qpid.Integration.Tests/testcases/UndeliverableTest.cs new file mode 100644 index 0000000000..d240c828f1 --- /dev/null +++ b/dotnet/Qpid.Integration.Tests/testcases/UndeliverableTest.cs @@ -0,0 +1,128 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using NUnit.Framework;
+using Apache.Qpid.Messaging;
+
+namespace Apache.Qpid.Integration.Tests.testcases
+{
+ /// <summary>
+ /// Tests that when sending undeliverable messages with the
+ /// mandatory flag set, an exception is raised on the connection
+ /// as the message is bounced back by the broker
+ /// </summary>
+ [TestFixture, Category("Integration")]
+ public class UndeliverableTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
+ private ManualResetEvent _event;
+ public const int TIMEOUT = 1000;
+ private Exception _lastException;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _event = new ManualResetEvent(false);
+ _lastException = null;
+
+ try
+ {
+ _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
+ } catch ( QpidException e )
+ {
+ _logger.Error("Could not add ExceptionListener", e);
+ }
+ }
+
+ public void OnException(Exception e)
+ {
+ // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message.
+
+ _lastException = e;
+ _logger.Error("OnException handler received connection-level exception", e);
+ if ( e is QpidException )
+ {
+ QpidException qe = (QpidException)e;
+ if ( qe.InnerException is AMQUndeliveredException )
+ {
+ AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
+ _logger.Error("inner exception is AMQUndeliveredException", ue);
+ _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
+ }
+ }
+ _event.Set();
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnDefaultExchange()
+ {
+ SendOne("default exchange", null);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnDirectExchange()
+ {
+ SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnTopicExchange()
+ {
+ SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnHeadersExchange()
+ {
+ SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+ }
+
+ private void SendOne(string exchangeNameFriendly, string exchangeName)
+ {
+ _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
+
+ // Send a test message to a non-existant queue
+ // on the specified exchange. See if message is returned!
+ MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
+ .WithRoutingKey("Non-existant route key!")
+ .WithMandatory(true); // necessary so that the server bounces the message back
+ if ( exchangeName != null )
+ {
+ builder.WithExchangeName(exchangeName);
+ }
+ IMessagePublisher publisher = builder.Create();
+ publisher.Send(_channel.CreateTextMessage("Hiya!"));
+
+ // check we received an exception on the connection
+ // and that it is of the right type
+ _event.WaitOne(TIMEOUT, true);
+
+ Type expectedException = typeof(AMQUndeliveredException);
+ Exception ex = _lastException;
+ Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
+
+ if ( ex.InnerException != null )
+ ex = ex.InnerException;
+
+ Assert.IsInstanceOfType(expectedException, ex);
+ }
+ }
+}
|
