diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client.Tests | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client.Tests')
18 files changed, 1778 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs b/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs new file mode 100644 index 0000000000..4e69e07f45 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/Common/BaseMessagingTestFixture.cs @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + public class BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(BaseMessagingTestFixture)); + + protected IConnection _connection; + + protected IChannel _channel; + + [SetUp] + public virtual void Init() + { + try + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + + bool local = true; + + if (local) + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + } + else + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false)); + } + _connection = new AMQConnection(connectionInfo); + _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge, 1); + } + catch (QpidException e) + { + _logger.Error("Error initialisng test fixture: " + e, e); + throw e; + } + } + + [TearDown] + public void Shutdown() + { + Console.WriteLine("Shutdown"); + if (_connection != null) + { + Console.WriteLine("Disposing connection"); + _connection.Dispose(); + } + } + } +} diff --git a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs new file mode 100644 index 0000000000..cf26b42c6a --- /dev/null +++ b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class HeadersMatchingConsumer : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(HeadersMatchingConsumer)); + + private string _serviceName = "ServiceQ1"; + + private AutoResetEvent _evt = new AutoResetEvent(false); + + [SetUp] + public override void Init() + { + base.Init(); + + _logger.Info("Starting..."); + + _logger.Info("Service (queue) name is '" + _serviceName + "'..."); + + _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); + + // Declare a new HeadersExchange with the name of the service. + _channel.DeclareExchange(_serviceName, ExchangeClassConstants.HEADERS); + + // Create non-durable, temporary (aka auto-delete), exclusive queue. + string queueName = _channel.GenerateUniqueName(); + _channel.DeclareQueue(queueName, false, true, true); + + // Bind our queue to the new HeadersExchange. + _channel.Bind(queueName, _serviceName, null, CreatePatternAsFieldTable()); + + IMessageConsumer consumer = _channel.CreateConsumerBuilder(queueName) + .withPrefetch(100) + .withNoLocal(true) + .Create(); + + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + } + + [Test] + public void Test() + { + _connection.Start(); + _logger.Info("Waiting..."); + _evt.WaitOne(); + } + + public void OnMessage(IMessage message) + { + _logger.Info(string.Format("message.Type = {0}", message.GetType())); + _logger.Info("Got message '" + message + "'"); + } + + private FieldTable CreatePatternAsFieldTable() + { + FieldTable matchTable = new FieldTable(); + // Currently all String matching must be prefixed by an "S" ("S" for string because of a failing of the FieldType definition). + matchTable["Smatch1"] = "foo"; + matchTable["Smatch2"] = ""; + return matchTable; + } + + public void OnException(Exception e) + { + if (e is QpidException && e.InnerException is AMQDisconnectedException) + { + _logger.Error("Broker closed connection"); + } + else + { + _logger.Error("Connection exception occurred: " + e); + } + _evt.Set(); + } + } +} diff --git a/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs new file mode 100644 index 0000000000..35f6017f48 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class HeadersMatchingProducer : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(HeadersMatchingProducer)); + + private string _commandExchangeName = "ServiceQ1"; + + private int _messageCount = 12; + + private IMessagePublisher _publisher; + + [SetUp] + public override void Init() + { + base.Init(); + + try + { + _publisher = _channel.CreatePublisherBuilder() + .withExchangeName(_commandExchangeName) + .withMandatory(true) + .Create(); + + // Disabling timestamps - a performance optimisation where timestamps and TTL/expiration + // are not required. + _publisher.DisableMessageTimestamp = true; + + _publisher.DeliveryMode = DeliveryMode.NonPersistent; + } + catch (QpidException e) + { + _logger.Error("Error: " + e, e); + } + } + + [Test] + public void SendMessages() + { + _connection.Start(); + for (int i = 0; i < _messageCount; i++) + { + int rem = i % 6; + IMessage msg = null; + switch (rem) + { + case 0: + msg = _channel.CreateTextMessage("matches match2='bar'"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = "bar"; + break; + + case 1: + msg = _channel.CreateTextMessage("not match - only match1"); + msg.Headers["match1"] = "foo"; + break; + + case 2: + msg = _channel.CreateTextMessage("not match - only match2"); + msg.Headers["match2"] = "bar"; + break; + + case 3: + msg = _channel.CreateTextMessage("matches match2=''"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = ""; + break; + + case 4: + msg = _channel.CreateTextMessage("matches - extra headers"); + msg.Headers["match1"] = "foo"; + msg.Headers["match2"] = "bar"; + msg.Headers["match3"] = "not required"; + break; + + case 5: + msg = _channel.CreateTextMessage("5: no match"); + msg.Headers["match1"] = "foo1"; + msg.Headers["match2"] = "bar"; + msg.Headers["match3"] = "not required"; + break; + } + _publisher.Send(msg); + } + _logger.Info("Finished sending " + _messageCount + " messages"); + } + } +} diff --git a/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs b/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs new file mode 100644 index 0000000000..6b6fca20b2 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class ProducerMultiConsumer : BaseMessagingTestFixture + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ProducerMultiConsumer)); + + private string _commandQueueName = "ServiceQ1"; + + private const int CONSUMER_COUNT = 5; + + private const int MESSAGE_COUNT = 1000; + + private const string MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk "; + + AutoResetEvent _finishedEvent = new AutoResetEvent(false); + + private static String GetData(int size) + { + StringBuilder buf = new StringBuilder(size); + int count = 0; + while (count < size + MESSAGE_DATA_BYTES.Length) + { + buf.Append(MESSAGE_DATA_BYTES); + count += MESSAGE_DATA_BYTES.Length; + } + if (count < size) + { + buf.Append(MESSAGE_DATA_BYTES, 0, size - count); + } + + return buf.ToString(); + } + + private IMessagePublisher _publisher; + + private IMessageConsumer[] _consumers = new IMessageConsumer[CONSUMER_COUNT]; + + private int _messageReceivedCount = 0; + + [SetUp] + public override void Init() + { + base.Init(); + _publisher = _channel.CreatePublisherBuilder() + .withRoutingKey(_commandQueueName) + .withExchangeName(ExchangeNameDefaults.TOPIC) + .Create(); + + _publisher.DisableMessageTimestamp = true; + _publisher.DeliveryMode = DeliveryMode.NonPersistent; + + for (int i = 0; i < CONSUMER_COUNT; i++) + { + string queueName = _channel.GenerateUniqueName(); + _channel.DeclareQueue(queueName, false, true, true); + + _channel.Bind(queueName, ExchangeNameDefaults.TOPIC, _commandQueueName); + + _consumers[i] = _channel.CreateConsumerBuilder(queueName) + .withPrefetch(100).Create(); + _consumers[i].OnMessage = new MessageReceivedDelegate(OnMessage); + } + _connection.Start(); + } + + public void OnMessage(IMessage m) + { + int newCount = Interlocked.Increment(ref _messageReceivedCount); + if (newCount % 1000 == 0) _logger.Info("Received count=" + newCount); + if (newCount == (MESSAGE_COUNT * CONSUMER_COUNT)) + { + _logger.Info("All messages received"); + _finishedEvent.Set(); + } + } + + [Test] + public void RunTest() + { + for (int i = 0; i < MESSAGE_COUNT; i++) + { + ITextMessage msg; + try + { + msg = _channel.CreateTextMessage(GetData(512 + 8*i)); + } + catch (Exception e) + { + _logger.Error("Error creating message: " + e, e); + break; + } + _publisher.Send(msg); + } + _finishedEvent.WaitOne(); + } + } +} diff --git a/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs b/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..3a2842c210 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Reflection; +using System.Runtime.InteropServices; +using log4net.Config; +[assembly: XmlConfigurator(ConfigFile="log4net.config")] + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Qpid.Client.Tests")] +[assembly: AssemblyDescription("Test Suite for Qpid Clients")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Apache Qpid")] +[assembly: AssemblyProduct("Qpid.Client.Tests")] +[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("7ebdea21-1352-4673-b66e-fdc0beff461f")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +[assembly: AssemblyVersion("0.5.*")] diff --git a/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj b/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj new file mode 100644 index 0000000000..c3e957ae09 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj @@ -0,0 +1,95 @@ +<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{BA1B0032-4CE6-40DD-A2DC-119F0FFA0A1D}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Qpid.Client.Tests</RootNamespace>
+ <AssemblyName>Qpid.Client.Tests</AssemblyName>
+ <StartupObject>
+ </StartupObject>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net, Version=1.2.9.0, Culture=neutral, PublicKeyToken=b32731d11ce58905">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="nunit.framework, Version=2.2.6.0, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>lib\nunit\nunit.framework.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="bio\BlockingIo.cs" />
+ <Compile Include="connection\ConnectionTest.cs" />
+ <Compile Include="failover\FailoverTest.cs" />
+ <Compile Include="failover\FailoverTxTest.cs" />
+ <Compile Include="HeadersExchange\HeadersMatchingConsumer.cs" />
+ <Compile Include="HeadersExchange\HeadersMatchingProducer.cs" />
+ <Compile Include="MultiConsumer\ProducerMultiConsumer.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Common\BaseMessagingTestFixture.cs" />
+ <Compile Include="requestreply1\ServiceProvidingClient.cs" />
+ <Compile Include="requestreply1\ServiceRequestingClient.cs" />
+ <Compile Include="undeliverable\UndeliverableTest.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Client.Transport.Socket.Blocking\Qpid.Client.Transport.Socket.Blocking.csproj">
+ <Project>{52AC4940-2077-4104-A753-29A9C8C16957}</Project>
+ <Name>Qpid.Client.Transport.Socket.Blocking</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+ <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+ <Name>Qpid.Client</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="log4net.config">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="Qpid.Common.DLL.config">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Client.Tests/Qpid.Common.DLL.config b/dotnet/Qpid.Client.Tests/Qpid.Common.DLL.config new file mode 100644 index 0000000000..e1300549d7 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/Qpid.Common.DLL.config @@ -0,0 +1,5 @@ +<configuration> + <assemblySettings> + <add key="OpenAMQ1d4Compatibility" value="false"/> + </assemblySettings> +</configuration>
\ No newline at end of file diff --git a/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs b/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs new file mode 100644 index 0000000000..24f3299dae --- /dev/null +++ b/dotnet/Qpid.Client.Tests/bio/BlockingIo.cs @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using NUnit.Framework; +using Qpid.Client.Protocol; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Transport +{ + [TestFixture] + public class BlockingIo + { + [Test] + public void connectFromOutside() + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + AMQConnection connection = new AMQConnection(connectionInfo); + ProtocolWriter protocolWriter = connection.ConvenientProtocolWriter; + + // TODO: Open channels and handle them simultaneously. + // Need more thread here? + // Send ChannelOpen. + ushort channelId = 1; + protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); + + connection.Close(); + } + + [Test] + public void regularConnection() + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + using (IConnection connection = new AMQConnection(connectionInfo)) { + Console.WriteLine("connection = {0}", connection); + Thread.Sleep(2000); + } + } + + [Test] + public void connectionAndSleepForHeartbeats() + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + using (IConnection connection = new AMQConnection(connectionInfo)) + { + Console.WriteLine("connection = {0}", connection); + Thread.Sleep(60000); + } + } + } +} diff --git a/dotnet/Qpid.Client.Tests/connection/ConnectionTest.cs b/dotnet/Qpid.Client.Tests/connection/ConnectionTest.cs new file mode 100644 index 0000000000..d6d7639c21 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/connection/ConnectionTest.cs @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using NUnit.Framework; +using Qpid.Client.qms; +using Qpid.Messaging; + +namespace Qpid.Client.Tests.connection +{ + [TestFixture] + public class ConnectionTest + { + [Test] + public void simpleConnection() + { + ConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + using (IConnection connection = new AMQConnection(connectionInfo)) + { + Console.WriteLine("connection = " + connection); + } + } + + [Test] + public void passwordFailureConnection() + { + ConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.setPassword("rubbish"); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo()); + try + { + using (IConnection connection = new AMQConnection(connectionInfo)) + { + Console.WriteLine("connection = " + connection); + } + } + catch (AMQException) + { + Assert.Fail(); +// if (!(e is AMQAuthenticationException)) +// { +// Assert.Fail("Expected AMQAuthenticationException!"); +// } + } + } +// +// [Test] +// public void connectionFailure() +// { +// try +// { +// new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5673?retries='0''"); +// Assert.fail("Connection should not be established"); +// } +// catch (AMQException amqe) +// { +// if (!(amqe instanceof AMQConnectionException)) +// { +// Assert.fail("Correct exception not thrown"); +// } +// } +// } +// +// [Test] +// public void unresolvedHostFailure() +// { +// try +// { +// new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='tcp://rubbishhost:5672?retries='0''"); +// Assert.fail("Connection should not be established"); +// } +// catch (AMQException amqe) +// { +// if (!(amqe instanceof AMQUnresolvedAddressException)) +// { +// Assert.fail("Correct exception not thrown"); +// } +// } +// } + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client.Tests/default.build b/dotnet/Qpid.Client.Tests/default.build new file mode 100644 index 0000000000..36cdb612b5 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/default.build @@ -0,0 +1,68 @@ +<?xml version="1.0"?> +<project name="XMS.AMQ.ClientTests" default="build"> + <property name="nant.settings.currentframework" value="net-1.0" /> + <property name="basename" value="XMSClientTests"/> + <property name="debug" value="true"/> + <property name="MINADir" value="../minadotnet"/> + <property name="XMSCommonDir" value="../xmscommon"/> + <property name="XMSClientDir" value="../xmsclient"/> +<!-- + <property name="NunitDir" value="../build/nunit"/> + <property name="NunitDir" value="C:\Program Files\NUnit-Net-2.0 2.2.8"/> +--> + <property name="NunitDir" value="C:\Program Files\NUnit 2.2.8"/> + + <if test="${debug}"> + <property name="targetdir" value="bin/${nant.settings.currentframework}/Debug"/> + </if> + <ifnot test="${debug}"> + <property name="targetdir" value="bin/${nant.settings.currentframework}/Release"/> + </ifnot> + + <target name="clean"> + <delete> + <fileset> + <include name="${targetdir}/${basename}.dll"/> + <include name="${targetdir}/${basename}.pdb"/> + </fileset> + </delete> + </target> + + <target name="init"> + <mkdir dir="${targetdir}"/> + </target> + + <target name="build" depends="init"> + <csc target="library" output="${targetdir}/${basename}.dll" debug="${debug}"> + <sources> + <include name="**/*.cs"/> + <exclude name="Properties/Settings.Designer.cs" /> + </sources> + <references> + <lib> + <include name="${MINADir}/${targetdir}" /> + <include name="${XMSCommonDir}/${targetdir}" /> + <include name="${XMSClientDir}/${targetdir}" /> + <include name="${XMSCommonDir}/lib/**" /> + <include name="${NunitDir}/bin" /> + </lib> + <include name="log4net.dll" /> + <include name="MINA.dll" /> + <include name="IBM.XMS.dll" /> + <include name="XMSCommon.dll" /> + <include name="XMSClient.dll" /> + <include name="nunit.framework.dll" /> + </references> + </csc> + <copy todir="${targetdir}"> + <fileset> + <include name="*.config" /> + <include name="${MINADir}/${targetdir}/*.dll" /> + <include name="${XMSCommonDir}/${targetdir}/*.dll" /> + <include name="${XMSClientDir}/${targetdir}/*.dll" /> + <include name="${NunitDir}/bin/nunit.framework.dll" /> + <include name="${XMSCommonDir}/lib/xms/*.dll" /> + </fileset> + </copy> + </target> +</project> diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs new file mode 100644 index 0000000000..6b3ab068f5 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Client.qms; +using Qpid.Messaging; + +namespace Qpid.Client.Tests.failover +{ + [TestFixture] + public class FailoverTest : IConnectionListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverTest)); + + private IConnection _connection; + private IChannel _channel; + private IMessagePublisher _publisher; + private int _count; + + private IMessageConsumer _consumerOfResponse; + + void DoFailoverTest(ConnectionInfo info) + { + DoFailoverTest(new AMQConnection(info)); + } + + void DoFailoverTest(IConnection connection) + { + AMQConnection amqConnection = (AMQConnection)connection; + amqConnection.ConnectionListener = this; + //Console.WriteLine("connection.url = " + amqConnection.ToURL()); + _connection = connection; + _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); + _channel = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + + string exchangeName = ExchangeNameDefaults.TOPIC; + string routingKey = "topic1"; + + string queueName = DeclareAndBindTemporaryQueue(exchangeName, routingKey); + + new MsgListener(_connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge), queueName); + + IChannel channel = _channel; + + string tempQueueName = channel.GenerateUniqueName(); + channel.DeclareQueue(tempQueueName, false, true, true); + _consumerOfResponse = channel.CreateConsumerBuilder(tempQueueName).Create(); + _consumerOfResponse.OnMessage = new MessageReceivedDelegate(OnMessage); + + _connection.Start(); + + IMessage msg = _channel.CreateTextMessage("Init"); + // FIXME: Leaving ReplyToExchangeName as default (i.e. the default exchange) + // FIXME: but the implementation might not like this as it defaults to null rather than "". + msg.ReplyToRoutingKey = tempQueueName; +// msg.ReplyTo = new ReplyToDestination("" /* i.e. the default exchange */, tempQueueName); + _logger.Info(String.Format("sending msg.Text={0}", ((ITextMessage)msg).Text)); + +// _publisher = _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); + _publisher = _channel.CreatePublisherBuilder() + .withRoutingKey(routingKey) + .withExchangeName(exchangeName) + .Create(); + _publisher.Send(msg); + } + + public string DeclareAndBindTemporaryQueue(string exchangeName, string routingKey) + { + string queueName = _channel.GenerateUniqueName(); + + // Queue.Declare + _channel.DeclareQueue(queueName, false, true, true); + + // Queue.Bind + _channel.Bind(queueName, exchangeName, routingKey); + return queueName; + } + + private void OnConnectionException(Exception e) + { + _logger.Error("Connection exception occurred", e); + } + + public void OnMessage(IMessage message) + { + try + { + _logger.Info("received message on temp queue msg.Text=" + ((ITextMessage)message).Text); + Thread.Sleep(1000); + _publisher.Send(_channel.CreateTextMessage("Message" + (++_count))); + } + catch (QpidException e) + { + error(e); + } + } + + private void error(Exception e) + { + _logger.Error("exception received", e); + stop(); + } + + private void stop() + { + _logger.Info("Stopping..."); + try + { + _connection.Dispose(); + } + catch (QpidException e) + { + _logger.Error("Failed to shutdown", e); + } + } + + public void BytesSent(long count) + { + } + + public void BytesReceived(long count) + { + } + + public bool PreFailover(bool redirect) + { + _logger.Info("preFailover(" + redirect + ") called"); + return true; + } + + public bool PreResubscribe() + { + _logger.Info("preResubscribe() called"); + return true; + } + + public void FailoverComplete() + { + _logger.Info("failoverComplete() called"); + } + + private class MsgListener + { + private IChannel _session; + private IMessagePublisher _publisher; + + internal MsgListener(IChannel session, string queueName) + { + _session = session; + _session.CreateConsumerBuilder(queueName).Create().OnMessage = + new MessageReceivedDelegate(OnMessage); + } + + public void OnMessage(IMessage message) + { + try + { + _logger.Info("Received: msg.Text = " + ((ITextMessage) message).Text); + if(_publisher == null) + { + _publisher = init(message); + } + reply(message); + } + catch (QpidException e) + { +// Error(e); + _logger.Error("yikes", e); // XXX + } + } + + private void reply(IMessage message) + { + string msg = ((ITextMessage) message).Text; + _logger.Info("sending reply - " + msg); + _publisher.Send(_session.CreateTextMessage(msg)); + } + + private IMessagePublisher init(IMessage message) + { + _logger.Info(string.Format("creating reply producer with dest = '{0}:{1}'", + message.ReplyToExchangeName, message.ReplyToRoutingKey)); + + string exchangeName = message.ReplyToExchangeName; + string routingKey = message.ReplyToRoutingKey; + + //return _channel.CreatePublisher(exchangeName, exchangeClass, routingKey); + return _session.CreatePublisherBuilder() + .withExchangeName(exchangeName) + .withRoutingKey(routingKey) + .Create(); + } + } + + [Test] + public void TestWithBasicInfo() + { + Console.WriteLine("TestWithBasicInfo"); + try + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); +// connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + + //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01", 7672, false)); + + + //connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false)); + + + DoFailoverTest(connectionInfo); + while (true) + { + Thread.Sleep(5000); + } + } + catch (Exception e) + { + _logger.Error("Exception caught", e); + } + } + +// [Test] +// public void TestWithUrl() +// { +// String clientId = "failover" + DateTime.Now.Ticks; +// String defaultUrl = "amqp://guest:guest@" + clientId + "/test" + +// "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; +// +// _logger.Info("url = [" + defaultUrl + "]"); +// +// // _logger.Info("connection url = [" + new AMQConnectionURL(defaultUrl) + "]"); +// +// String broker = defaultUrl; +// //new FailoverTest(broker); +// } + } +} diff --git a/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs new file mode 100644 index 0000000000..ad1570ed14 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs @@ -0,0 +1,194 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.InteropServices; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Client.qms; +using Qpid.Messaging; + +namespace Qpid.Client.Tests.failover +{ + [TestFixture] + public class FailoverTxTest : IConnectionListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverTxTest)); + + const int NUM_ITERATIONS = 3; + const int NUM_MESSAGES = 10; + const int SLEEP_MILLIS = 500; + + AMQConnection _connection; + + public void onMessage(IMessage message) + { + try + { + _log.Info("Received: " + ((ITextMessage) message).Text); + } + catch (QpidException e) + { + error(e); + } + } + + void DoFailoverTxTest(ConnectionInfo connectionInfo) + { + _connection = new AMQConnection(connectionInfo); + _connection.ConnectionListener = this; + _log.Info("connection = " + _connection); + _log.Info("connectionInfo = " + connectionInfo); + _log.Info("connection.asUrl = " + _connection.toURL()); + + IChannel session = _connection.CreateChannel(false, AcknowledgeMode.NoAcknowledge); + + string queueName = session.GenerateUniqueName(); + + // Queue.Declare + session.DeclareQueue(queueName, false, true, true); + + // No need to call Queue.Bind as automatically bound to default direct exchange. +// channel.Bind(queueName, exchangeName, routingKey); + + session.CreateConsumerBuilder(queueName).Create().OnMessage = new MessageReceivedDelegate(onMessage); + + _connection.Start(); + + sendInTx(queueName); + + _connection.Close(); + _log.Info("FailoverTxText complete"); + } + + private void sendInTx(string routingKey) + { + _log.Info("sendInTx"); + bool transacted = false; + IChannel session = _connection.CreateChannel(transacted, AcknowledgeMode.NoAcknowledge); + IMessagePublisher publisher = session.CreatePublisherBuilder() + .withRoutingKey(routingKey) + .Create(); + + for (int i = 1; i <= NUM_ITERATIONS; ++i) + { + for (int j = 1; j <= NUM_MESSAGES; ++j) + { + ITextMessage msg = session.CreateTextMessage("Tx=" + i + " msg=" + j); + _log.Info("sending message = " + msg.Text); + publisher.Send(msg); + Thread.Sleep(SLEEP_MILLIS); + } + if (transacted) session.Commit(); + } + } + + private void error(Exception e) + { + _log.Fatal("Exception received. About to stop.", e); + stop(); + } + + private void stop() + { + _log.Info("Stopping..."); + try + { + _connection.Close(); + } + catch (QpidException e) + { + _log.Info("Failed to shutdown: ", e); + } + } + + public void BytesSent(long count) + { + } + + public void BytesReceived(long count) + { + } + + public bool PreFailover(bool redirect) + { + _log.Info("preFailover(" + redirect + ") called"); + return true; + } + + public bool PreResubscribe() + { + _log.Info("preResubscribe() called"); + return true; + } + + public void FailoverComplete() + { + _log.Info("failoverComplete() called"); + } + + [Test] + public void TestWithBasicInfo() + { + Console.WriteLine("TestWithBasicInfo"); + Console.WriteLine(".NET Framework version: " + RuntimeEnvironment.GetSystemVersion()); + try + { + QpidConnectionInfo connectionInfo = new QpidConnectionInfo(); + + bool local = true; + if (local) + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5672, false)); + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "localhost", 5673, false)); + } + else + { + connectionInfo.AddBrokerInfo(new AmqBrokerInfo("amqp", "eqd-lxamq01.uk.jpmorgan.com", 8099, false)); + } + + DoFailoverTxTest(connectionInfo); + } + catch (Exception e) + { + _log.Error("Exception caught", e); + } + } + + //[Test] + //public void runTestWithUrl() + //{ + // try { + // String clientId = "failover" + DateTime.Now.Ticks; + // string defaultUrl = "amqp://guest:guest@" + clientId + "/test" + + // "?brokerlist='tcp://localhost:5672;tcp://localhost:5673'&failover='roundrobin'"; + + // _log.Info("url = [" + defaultUrl + "]"); + + // _log.Info("connection url = [" + new AMQConnectionInfo(defaultUrl) + "]"); + + // DoFailoverTxTest(new AMQConnectionInfo(defaultUrl)); + // } catch (Exception e) { + // _log.Error("test failed", e); + // } + //} + } +} diff --git a/dotnet/Qpid.Client.Tests/lib/nunit/nunit-licence.txt b/dotnet/Qpid.Client.Tests/lib/nunit/nunit-licence.txt new file mode 100644 index 0000000000..b2316295d3 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/lib/nunit/nunit-licence.txt @@ -0,0 +1,23 @@ +Copyright © 2002-2004 James W. Newkirk, Michael C. Two, Alexei A. Vorontsov, + Charlie Poole +Copyright © 2000-2004 Philip A. Craig + +This software is provided 'as-is', without any express or implied warranty. In +no event will the authors be held liable for any damages arising from the use +of this software. + +Permission is granted to anyone to use this software for any purpose, including +commercial applications, and to alter it and redistribute it freely, subject to +the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim + that you wrote the original software. If you use this software in a product, an + acknowledgment (see the following) in the product documentation is required. + + Portions Copyright © 2002 James W. Newkirk, Michael C. Two, Alexei A. Vorontsov + or Copyright © 2000-2002 Philip A. Craig + +2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + +3. This notice may not be removed or altered from any source distribution. diff --git a/dotnet/Qpid.Client.Tests/lib/nunit/nunit.framework.dll b/dotnet/Qpid.Client.Tests/lib/nunit/nunit.framework.dll Binary files differnew file mode 100644 index 0000000000..53666e74c9 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/lib/nunit/nunit.framework.dll diff --git a/dotnet/Qpid.Client.Tests/log4net.config b/dotnet/Qpid.Client.Tests/log4net.config new file mode 100644 index 0000000000..e0a43a8b5e --- /dev/null +++ b/dotnet/Qpid.Client.Tests/log4net.config @@ -0,0 +1,38 @@ +<log4net> + <appender name="console" type="log4net.Appender.ConsoleAppender" > + <layout type="log4net.Layout.PatternLayout"> + <conversionPattern value="%d [%t] %-5p %c:%M(%L) - %m%n" /> + </layout> + </appender> + + <appender name="protocolLog" type="log4net.Appender.FileAppender"> + <file value="protocol.log"/> + <appendToFile value="false"/> + <layout type="log4net.Layout.PatternLayout"> + <conversionPattern value="%date - %message%newline"/> + </layout> + </appender> + + <appender name="ioLog" type="log4net.Appender.FileAppender"> + <file value="io.log"/> + <appendToFile value="false"/> + <layout type="log4net.Layout.PatternLayout"> + <conversionPattern value="%date - %message%newline"/> + </layout> + </appender> + + <logger name="Qpid.Client.ProtocolChannel.Tracing" additivity="false"> + <level value="info"/> + <appender-ref ref="protocolLog"/> + </logger> + + <logger name="Qpid.Client.ByteChannel.Tracing" additivity="false"> + <level value="info" /> + <appender-ref ref="ioLog"/> + </logger> + + <root> + <level value="info"/> + <appender-ref ref="console"/> + </root> +</log4net>
\ No newline at end of file diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs new file mode 100644 index 0000000000..bb5758c18c --- /dev/null +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class ServiceProvidingClient : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(ServiceProvidingClient)); + + private int _messageCount; + + private string _replyToExchangeName; + private string _replyToRoutingKey; + + private IMessagePublisher _destinationPublisher; + + private string _serviceName = "ServiceQ1"; + + private string _selector = null; + + //private EventWaitHandle _event = new ManualResetEvent(false); + private AutoResetEvent _event = new AutoResetEvent(false); + + [SetUp] + public override void Init() + { + base.Init(); + + _logger.Info("Starting..."); + _logger.Info("Service (queue) name is '" + _serviceName + "'..."); + + _connection.ExceptionListener = new ExceptionListenerDelegate(OnConnectionException); + + _logger.Info("Message selector is <" + _selector + ">..."); + + _channel.DeclareQueue(_serviceName, false, false, false); + + IMessageConsumer consumer = _channel.CreateConsumerBuilder(_serviceName) + .withPrefetch(100) + .withNoLocal(true) + .Create(); + consumer.OnMessage = new MessageReceivedDelegate(OnMessage); + } + + private void OnConnectionException(Exception e) + { + _logger.Info("Connection exception occurred", e); + _event.Set(); // Shutdown test on error + // XXX: Test still doesn't shutdown when broker terminates. Is there no heartbeat? + } + + [Test] + public void Test() + { + _connection.Start(); + _logger.Info("Waiting..."); + _event.WaitOne(); + } + + public void OnMessage(IMessage message) + { +// _logger.Info("Got message '" + message + "'"); + + ITextMessage tm = (ITextMessage)message; + + try + { + string replyToExchangeName = tm.ReplyToExchangeName; + string replyToRoutingKey = tm.ReplyToRoutingKey; + + _replyToExchangeName = replyToExchangeName; + _replyToRoutingKey = replyToRoutingKey; + _logger.Debug("About to create a producer"); + +// Console.WriteLine("ReplyTo.ExchangeName = " + _replyToExchangeName); +// Console.WriteLine("ReplyTo.RoutingKey = " + _replyToRoutingKey); + + _destinationPublisher = _channel.CreatePublisherBuilder() + .withExchangeName(_replyToExchangeName) + .withRoutingKey(_replyToRoutingKey) + .Create(); + _destinationPublisher.DisableMessageTimestamp = true; + _destinationPublisher.DeliveryMode = DeliveryMode.NonPersistent; + _logger.Debug("After create a producer"); + } + catch (QpidException e) + { + _logger.Error("Error creating destination", e); + throw e; + } + _messageCount++; + if (_messageCount % 1000 == 0) + { + _logger.Info("Received message total: " + _messageCount); + _logger.Info(string.Format("Sending response to '{0}:{1}'", + _replyToExchangeName, _replyToRoutingKey)); + } + + try + { + String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.Text; + ITextMessage msg = _channel.CreateTextMessage(payload); + if (tm.Headers.Contains("timeSent")) + { +// _logger.Info("timeSent property set on message"); +// _logger.Info("timeSent value is: " + tm.Headers["timeSent"]); + msg.Headers["timeSent"] = tm.Headers["timeSent"]; + } + _destinationPublisher.Send(msg); + if (_messageCount % 1000 == 0) + { + _logger.Info(string.Format("Sending response to '{0}:{1}'", + _replyToExchangeName, _replyToRoutingKey)); + } + } + catch (QpidException e) + { + _logger.Error("Error sending message: " + e, e); + throw e; + } + } + } +} diff --git a/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs new file mode 100644 index 0000000000..582f022719 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class ServiceRequestingClient : BaseMessagingTestFixture + { + private const int MESSAGE_SIZE = 1024; + private static string MESSAGE_DATA = new string('x', MESSAGE_SIZE); + + private const int NUM_MESSAGES = 10000; + + private static ILog _log = LogManager.GetLogger(typeof(ServiceRequestingClient)); + + AutoResetEvent _finishedEvent = new AutoResetEvent(false); + + private int _expectedMessageCount = NUM_MESSAGES; + + private long _startTime; + + private string _commandQueueName = "ServiceQ1"; + + private IMessagePublisher _publisher; + + Avergager averager = new Avergager(); + + private void InitialiseProducer() + { + try + { + _publisher = _channel.CreatePublisherBuilder() + .withRoutingKey(_commandQueueName) + .Create(); + _publisher.DisableMessageTimestamp = true; // XXX: need a "with" for this in builder? + _publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX: need a "with" for this in builder? + } + catch (QpidException e) + { + _log.Error("Error: " + e, e); + } + } + + [Test] + public void SendMessages() + { + InitialiseProducer(); + + string replyQueueName = _channel.GenerateUniqueName(); + + _channel.DeclareQueue(replyQueueName, false, true, true); + + IMessageConsumer messageConsumer = _channel.CreateConsumerBuilder(replyQueueName) + .withPrefetch(100) + .withNoLocal(true) + .withExclusive(true).Create(); + + _startTime = DateTime.Now.Ticks; + + messageConsumer.OnMessage = new MessageReceivedDelegate(OnMessage); + _connection.Start(); + for (int i = 0; i < _expectedMessageCount; i++) + { + ITextMessage msg; + try + { + msg = _channel.CreateTextMessage(MESSAGE_DATA + i); + } + catch (Exception e) + { + _log.Error("Error creating message: " + e, e); + break; + } + msg.ReplyToRoutingKey = replyQueueName; + + // Added timestamp. + long timeNow = DateTime.Now.Ticks; + string timeSentString = String.Format("{0:G}", timeNow); +// _log.Info(String.Format("timeSent={0} timeSentString={1}", timeNow, timeSentString)); + msg.Headers.SetString("timeSent", timeSentString); + //msg.Headers.SetLong("sentAt", timeNow); + + try + { + _publisher.Send(msg); + } + catch (Exception e) + { + _log.Error("Error sending message: " + e, e); + //base._port = 5673; + _log.Info("Reconnecting but on port 5673"); + try + { + base.Init(); + InitialiseProducer(); + // cheesy but a quick test + _log.Info("Calling SendMessages again"); + SendMessages(); + } + catch (Exception ex) + { + _log.Error("Totally busted: failed to reconnect: " + ex, ex); + } + } + } + + // Assert that the test finishes within a reasonable amount of time. + const int waitSeconds = 10; + const int waitMilliseconds = waitSeconds * 1000; + _log.Info("Finished sending " + _expectedMessageCount + " messages"); + _log.Info(String.Format("Waiting {0} seconds to receive last message...", waitSeconds)); + Assert.IsTrue(_finishedEvent.WaitOne(waitMilliseconds, false), + String.Format("Expected to finish in {0} seconds", waitSeconds)); + } + + public void OnMessage(IMessage m) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Message received: " + m); + } + + //if (m.Headers.Contains("sentAt")) + if (!m.Headers.Contains("timeSent")) + { + throw new Exception("Set timeSent!"); + } + //long sentAt = m.Headers.GetLong("sentAt"); + long sentAt = Int64.Parse(m.Headers.GetString("timeSent")); + long now = DateTime.Now.Ticks; + long latencyTicks = now - sentAt; +// _log.Info(String.Format("latency = {0} ticks ", latencyTicks)); + long latencyMilliseconds = latencyTicks / TimeSpan.TicksPerMillisecond; +// _log.Info(String.Format("latency = {0} ms", latencyMilliseconds)); + + averager.Add(latencyMilliseconds); + + // Output average every 1000 messages. + if (averager.Num % 1000 == 0) + { + _log.Info("Ticks per millisecond = " + TimeSpan.TicksPerMillisecond); + _log.Info(String.Format("Average latency (ms) = {0}", averager)); + _log.Info("Received message count: " + averager.Num); + } + + if (averager.Num == _expectedMessageCount) + { + _log.Info(String.Format("Final average latency (ms) = {0}", averager)); + + double timeTakenSeconds = (DateTime.Now.Ticks - _startTime) * 1.0 / (TimeSpan.TicksPerMillisecond * 1000); + _log.Info("Total time taken to receive " + _expectedMessageCount + " messages was " + + timeTakenSeconds + "s, equivalent to " + + (_expectedMessageCount/timeTakenSeconds) + " messages per second"); + + _finishedEvent.Set(); // Notify main thread to quit. + } + } + + public static void Main(String[] args) + { + ServiceRequestingClient c = new ServiceRequestingClient(); + c.Init(); + c.SendMessages(); + } + } + + class Avergager + { + long num = 0; + long sum = 0; + + long min = long.MaxValue; + long max = long.MinValue; + + public void Add(long item) + { + ++num; + sum += item; + if (item < min) min = item; + if (item > max) max = item; + } + + public long Average { get { return sum/num; }} + + public long Num { get { return num; } } + + public override string ToString() + { + return String.Format("average={0} min={1} max={2}", Average, min, max); + } + } +} diff --git a/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs new file mode 100644 index 0000000000..63c936d667 --- /dev/null +++ b/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using NUnit.Framework; +using Qpid.Messaging; + +namespace Qpid.Client.Tests +{ + [TestFixture] + public class UndeliverableTest : BaseMessagingTestFixture + { + private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest)); + + [SetUp] + public override void Init() + { + base.Init(); + + try + { + _connection.ExceptionListener = new ExceptionListenerDelegate(OnException); + } + catch (QpidException e) + { + _logger.Error("Could not add ExceptionListener", e); + } + } + + public static void OnException(Exception e) + { + // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message. + + _logger.Error("OnException handler received connection-level exception", e); + if (e is QpidException) + { + QpidException qe = (QpidException)e; + if (qe.InnerException is AMQUndeliveredException) + { + AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException; + _logger.Error("inner exception is AMQUndeliveredException", ue); + _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage())); + + } + } + } + + [Test] + public void SendUndeliverableMessage() + { + SendOne("default exchange", null); + SendOne("direct exchange", ExchangeNameDefaults.DIRECT); + SendOne("topic exchange", ExchangeNameDefaults.TOPIC); + SendOne("headers exchange", ExchangeNameDefaults.HEADERS); + + Thread.Sleep(1000); // Wait for message returns! + } + + private void SendOne(string exchangeNameFriendly, string exchangeName) + { + _logger.Info("Sending undeliverable message to " + exchangeNameFriendly); + + // Send a test message to a non-existant queue on the default exchange. See if message is returned! + MessagePublisherBuilder builder = _channel.CreatePublisherBuilder() + .withRoutingKey("Non-existant route key!") + .withMandatory(true); + if (exchangeName != null) + { + builder.withExchangeName(exchangeName); + } + IMessagePublisher publisher = builder.Create(); + publisher.Send(_channel.CreateTextMessage("Hiya!")); + } + } +} |
