From 61cd187331ac9491240689f6de1b46bfb661a48e Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Tue, 27 Apr 2010 21:38:39 +0000 Subject: Apply all three patches from QPID-2500. Also included WcfPerfTest into the QpidWcf.sln solution. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@938677 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/wcf/QpidWcf.sln | 12 +- qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs | 23 ++- .../src/Apache/Qpid/Channel/AmqpChannelFactory.cs | 17 ++ .../src/Apache/Qpid/Channel/AmqpChannelHelpers.cs | 80 ++++++++- .../src/Apache/Qpid/Channel/AmqpChannelListener.cs | 16 ++ .../src/Apache/Qpid/Channel/AmqpCredentialType.cs | 37 ++++ qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs | 75 ++++++++ .../Qpid/Channel/AmqpTransportBindingElement.cs | 30 +++- .../Apache/Qpid/Channel/AmqpTransportSecurity.cs | 108 +++++++++++ qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj | 5 + .../src/Apache/Qpid/Channel/ConnectionManager.cs | 67 ++++++- qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp | 200 +++++++++++++++++---- .../wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp | 109 ++++++++++- qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h | 26 +-- .../Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs | 125 ++++++++++++- 15 files changed, 866 insertions(+), 64 deletions(-) create mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs create mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs create mode 100644 qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs diff --git a/qpid/wcf/QpidWcf.sln b/qpid/wcf/QpidWcf.sln index 9f83cf553a..9e3cc5621e 100644 --- a/qpid/wcf/QpidWcf.sln +++ b/qpid/wcf/QpidWcf.sln @@ -28,9 +28,11 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Interop", "src\Apache\Qpid\ {820BFC34-A40F-46BA-B86B-05334854CA17} = {820BFC34-A40F-46BA-B86B-05334854CA17} EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Channel", "src\Apache\Qpid\Channel\Channel.csproj", "{8AABAB30-7D1E-4539-B7D1-05450262BAD2}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FunctionalTests", "test\Apache\Qpid\Test\Channel\Functional\FunctionalTests.csproj", "{E2D8C779-E417-40BA-BEE1-EE034268482F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Channel", "src\Apache\Qpid\Channel\Channel.csproj", "{8AABAB30-7D1E-4539-B7D1-05450262BAD2}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WcfPerfTest", "test\Apache\Qpid\Test\Channel\WcfPerfTest\WcfPerfTest.csproj", "{D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -72,6 +74,14 @@ Global {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|Win32.Build.0 = Release|Any CPU {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|x64.ActiveCfg = Release|Any CPU {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|x64.Build.0 = Release|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Win32.ActiveCfg = Debug|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Win32.Build.0 = Debug|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|x64.ActiveCfg = Debug|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|x64.Build.0 = Debug|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Win32.ActiveCfg = Release|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Win32.Build.0 = Release|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|x64.ActiveCfg = Release|Any CPU + {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs index b0b71c87f3..cfb2e6095c 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -33,11 +33,11 @@ namespace Apache.Qpid.Channel { protected AmqpTransportBindingElement transport; protected MessageEncodingBindingElement encoding; + protected AmqpSecurity security; public AmqpBinding() + : this (new BinaryMessageEncodingBindingElement()) { - transport = new AmqpTransportBindingElement(); - encoding = new BinaryMessageEncodingBindingElement(); } protected AmqpBinding(MessageEncodingBindingElement encoding) @@ -70,6 +70,25 @@ namespace Apache.Qpid.Channel set { transport.PrefetchLimit = value; } } + public AmqpSecurity Security + { + get + { + if (security == null) + { + if (transport.ChannelProperties.AmqpTransportSecurity == null) + { + transport.ChannelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + security = new AmqpSecurity(transport.ChannelProperties.AmqpTransportSecurity); + transport.BindingSecurity = security; + } + + return security; + } + } + public bool Shared { get { return transport.Shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs index 5012c76d7e..9b27b00994 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel using System; using System.ServiceModel; using System.ServiceModel.Channels; + using System.ServiceModel.Description; using System.Collections.Generic; using System.Collections.ObjectModel; @@ -33,12 +34,14 @@ namespace Apache.Qpid.Channel long maxBufferPoolSize; bool shared; int prefetchLimit; + BindingContext bindingContext; List openChannels; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) { this.bindingElement = bindingElement; + this.bindingContext = context; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; this.prefetchLimit = bindingElement.PrefetchLimit; @@ -81,6 +84,20 @@ namespace Apache.Qpid.Channel protected override void OnOpen(TimeSpan timeout) { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs index 0853b3d6f3..2a5b9410dc 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -24,6 +24,7 @@ namespace Apache.Qpid.Channel using System.Net.Sockets; using System.ServiceModel; using System.ServiceModel.Channels; + using System.ServiceModel.Description; using System.Globalization; using Apache.Qpid.AmqpTypes; @@ -67,7 +68,9 @@ namespace Apache.Qpid.Channel int brokerPort; TransferMode transferMode; AmqpProperties defaultMessageProperties; - + AmqpSecurityMode amqpSecurityMode; + AmqpTransportSecurity amqpTransportSecurity; + AmqpCredential amqpCredential; long maxBufferPoolSize; int maxReceivedMessageSize; @@ -77,6 +80,9 @@ namespace Apache.Qpid.Channel this.brokerPort = AmqpDefaults.BrokerPort; this.transferMode = AmqpDefaults.TransferMode; this.defaultMessageProperties = null; + this.amqpSecurityMode = AmqpSecurityMode.None; + this.amqpTransportSecurity = null; + this.amqpCredential = null; this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize; this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize; } @@ -89,6 +95,16 @@ namespace Apache.Qpid.Channel props.defaultMessageProperties = this.defaultMessageProperties.Clone(); } + if (this.amqpTransportSecurity != null) + { + props.amqpTransportSecurity = this.amqpTransportSecurity.Clone(); + } + + if (this.amqpCredential != null) + { + this.amqpCredential = this.amqpCredential.Clone(); + } + return props; } @@ -116,6 +132,24 @@ namespace Apache.Qpid.Channel set { this.defaultMessageProperties = value; } } + internal AmqpSecurityMode AmqpSecurityMode + { + get { return this.amqpSecurityMode; } + set { this.amqpSecurityMode = value; } + } + + internal AmqpTransportSecurity AmqpTransportSecurity + { + get { return this.amqpTransportSecurity; } + set { this.amqpTransportSecurity = value; } + } + + internal AmqpCredential AmqpCredential + { + get { return this.amqpCredential; } + set { this.amqpCredential = value; } + } + internal long MaxBufferPoolSize { get { return this.maxBufferPoolSize; } @@ -138,5 +172,49 @@ namespace Apache.Qpid.Channel throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue."); } } + + internal static void FindAuthenticationCredentials(AmqpChannelProperties channelProperties, + BindingContext bindingContext) + { + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec == null) + { + // no auth + return; + } + + if (tsec.CredentialType == AmqpCredentialType.Anonymous) + { + // no auth + return; + } + + // credentials search order: specific AmqpCredentials, specific + // ClientCredentials (if applicable), binding's default credentials + + AmqpCredential amqpCred = bindingContext.BindingParameters.Find(); + if (amqpCred != null) + { + channelProperties.AmqpCredential = amqpCred; + return; + } + + if (!tsec.IgnoreEndpointClientCredentials) + { + ClientCredentials cliCred = bindingContext.BindingParameters.Find(); + if (cliCred != null) + { + channelProperties.AmqpCredential = new AmqpCredential(cliCred.UserName.UserName, + cliCred.UserName.Password); + return; + } + } + + if (tsec.DefaultCredential != null) + { + channelProperties.AmqpCredential = tsec.DefaultCredential.Clone(); + } + } + } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs index 3d7801e7c6..78655f2124 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -31,6 +31,7 @@ namespace Apache.Qpid.Channel MessageEncoderFactory messageEncoderFactory; AmqpTransportBindingElement bindingElement; AmqpChannelProperties channelProperties; + BindingContext bindingContext; bool shared; int prefetchLimit; long maxBufferPoolSize; @@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel { this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.bindingContext = context; this.shared = bindingElement.Shared; this.prefetchLimit = bindingElement.PrefetchLimit; @@ -100,6 +102,20 @@ namespace Apache.Qpid.Channel protected override void OnOpen(TimeSpan timeout) { + // check and freeze security properties now + AmqpSecurityMode mode = AmqpSecurityMode.None; + if (this.bindingElement.BindingSecurity != null) + { + mode = bindingElement.BindingSecurity.Mode; + } + + this.channelProperties.AmqpSecurityMode = mode; + if (mode == AmqpSecurityMode.None) + { + return; + } + + AmqpChannelHelpers.FindAuthenticationCredentials(this.channelProperties, this.bindingContext); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs new file mode 100644 index 0000000000..2bafbbb54e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpCredentialType.cs @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// + /// Enumerates the SASL authentication mechanisms used by the AMQP transport + /// + public enum AmqpCredentialType + { + /// + /// SASL ANONYMOUS mechanism + /// + Anonymous, + + /// + /// SASL PLAIN mechanism: username and password + /// + Plain + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs new file mode 100644 index 0000000000..5d88afb88f --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpSecurity.cs @@ -0,0 +1,75 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + + /// + /// Specifies the types of trasport-level and message-level security used by + /// an endpoint configured with an AmqpBinding. + /// + public sealed class AmqpSecurity + { + private AmqpSecurityMode mode; + private AmqpTransportSecurity transportSecurity; + + internal AmqpSecurity() + { + this.mode = AmqpSecurityMode.None; + } + + internal AmqpSecurity(AmqpTransportSecurity tsec) + { + if (tsec == null) + { + throw new ArgumentNullException("AmqpTransportSecurity"); + } + + this.mode = AmqpSecurityMode.Transport; + this.transportSecurity = tsec; + } + + /// + /// gets or sets the security mode. + /// + public AmqpSecurityMode Mode + { + get { return this.mode; } + set {this.mode = value; } + } + + /// + /// gets the security object that controls encryption + /// and authentication parameters for the AMQP transport. + /// + public AmqpTransportSecurity Transport + { + get + { + if (this.transportSecurity == null) + { + this.transportSecurity = new AmqpTransportSecurity(); + } + + return this.transportSecurity; + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs index 7993252309..a98f361d19 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -29,7 +29,8 @@ namespace Apache.Qpid.Channel { AmqpChannelProperties channelProperties; bool shared; - int prefetchLimit; + int prefetchLimit; + AmqpSecurity bindingSecurity; public AmqpTransportBindingElement() { @@ -43,6 +44,13 @@ namespace Apache.Qpid.Channel this.channelProperties = other.channelProperties.Clone(); this.shared = other.shared; this.prefetchLimit = other.prefetchLimit; + this.bindingSecurity = other.bindingSecurity; + } + + internal AmqpSecurity BindingSecurity + { + get { return this.bindingSecurity; } + set { this.bindingSecurity = value; } } public override IChannelFactory BuildChannelFactory(BindingContext context) @@ -88,6 +96,12 @@ namespace Apache.Qpid.Channel get { return channelProperties; } } + public AmqpCredential AmqpCredential + { + get { return this.channelProperties.AmqpCredential; } + set { this.channelProperties.AmqpCredential = value; } + } + public string BrokerHost { get { return this.channelProperties.BrokerHost; } @@ -123,6 +137,20 @@ namespace Apache.Qpid.Channel set { this.channelProperties.TransferMode = value; } } + public AmqpTransportSecurity TransportSecurity + { + get + { + if (this.channelProperties.AmqpTransportSecurity == null) + { + this.channelProperties.AmqpTransportSecurity = new AmqpTransportSecurity(); + } + + return this.channelProperties.AmqpTransportSecurity; + } + } + + public AmqpProperties DefaultMessageProperties { get { return this.channelProperties.DefaultMessageProperties; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs new file mode 100644 index 0000000000..41c36c7bcd --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportSecurity.cs @@ -0,0 +1,108 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// + /// This class is used by the AMQP Transport to set transport-level security settings for a binding + /// + public sealed class AmqpTransportSecurity + { + private AmqpCredentialType credentialType; + + // WCF frowns on unencrypted credentials on the wire, but AMQP is agnostic. + // For interoperability, allow SSL to be turned on/off independentaly. + private bool useSSL; + + // Allow per channel credentials, but also ease the common case where + // credentials are shared and wish to be globally set in a config file. + private AmqpCredential defaultCredential; + + // if true, do not look at context for ServiceModel.Description.ClientCredentials. + // ClientCredentials will be place of choice for WCF traditionalists + // to specify auth tokens to the AMQP server when Windows and SASL tokens + // look the same. At other times it makes no sense and sometimes it is + // confusing with Message-level credentials. + private bool ignoreEndpointClientCredentials; + + + internal AmqpTransportSecurity() + { + this.credentialType = AmqpCredentialType.Anonymous; + this.useSSL = true; + } + + /// + /// gets or sets the SASL mechanism for AMQP authentication between client and server. + /// + public AmqpCredentialType CredentialType + { + get { return this.credentialType; } + + set { this.credentialType = value; } + } + + /// + /// gets or sets the flag that controls the use of SSL encryption + /// over the network connection. + /// + public bool UseSSL + { + get { return this.useSSL; } + set { this.useSSL = value; } + } + + /// + /// gets the default credential object for authentication with the AMQP server. + /// + public AmqpCredential DefaultCredential + { + get + { + if (this.defaultCredential == null) + { + this.defaultCredential = new AmqpCredential("", ""); + } + + return this.defaultCredential; + } + } + + /// + /// gets or sets the endpoint ClientCredentials search parameter. If true, + /// only AmqpCredential objects are searched for in the surrounding context. + /// + public bool IgnoreEndpointClientCredentials + { + get { return this.ignoreEndpointClientCredentials; } + set { this.ignoreEndpointClientCredentials = value; } + } + + internal AmqpTransportSecurity Clone() + { + AmqpTransportSecurity sec = (AmqpTransportSecurity)this.MemberwiseClone(); + if (this.defaultCredential != null) + { + sec.defaultCredential = this.defaultCredential.Clone(); + } + + return sec; + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj index ac90fb7d64..dfa41c9417 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -64,6 +64,10 @@ under the License. + + + + @@ -72,6 +76,7 @@ under the License. + diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs index a63e5333f4..7238ff2120 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs @@ -22,6 +22,7 @@ namespace Apache.Qpid.Channel using System; using System.Collections; using System.Collections.Generic; + using System.Text; using System.Threading; using Apache.Qpid.Interop; @@ -61,7 +62,38 @@ namespace Apache.Qpid.Channel private static string MakeKey(AmqpChannelProperties props) { - return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode; + StringBuilder sb = new StringBuilder(); + sb.Append(props.BrokerHost); + sb.Append(':'); + sb.Append(props.BrokerPort); + sb.Append(':'); + sb.Append(props.TransferMode); + + AmqpTransportSecurity sec = props.AmqpTransportSecurity; + if (sec == null) + { + return sb.ToString(); + } + + if (sec.UseSSL) + { + sb.Append(":SSL"); + } + + if (sec.CredentialType == AmqpCredentialType.Plain) + { + sb.Append(":saslP"); + AmqpCredential cred = props.AmqpCredential; + if (cred != null) + { + sb.Append(":NM:"); + sb.Append(cred.UserName); + sb.Append(":PW:"); + sb.Append(cred.Password); + } + } + + return sb.ToString(); } private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing) @@ -165,7 +197,38 @@ namespace Apache.Qpid.Channel if (connection == null) { - connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + if (channelProperties.AmqpSecurityMode != AmqpSecurityMode.None) + { + string user = null; + string passwd = null; + bool ssl = false; + bool saslPlain = false; + + AmqpTransportSecurity tsec = channelProperties.AmqpTransportSecurity; + if (tsec.UseSSL) + { + ssl = true; + } + + if (tsec.CredentialType == AmqpCredentialType.Plain) + { + saslPlain = true; + AmqpCredential plainCred = channelProperties.AmqpCredential; + if (plainCred != null) + { + user = plainCred.UserName; + passwd = plainCred.Password; + } + } + + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort, + ssl, saslPlain, user, passwd); + } + else + { + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + } + newConnection = true; if (this.shared) { diff --git a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp index f9d8bd8521..33d125e3c6 100644 --- a/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp +++ b/qpid/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp @@ -31,7 +31,7 @@ // registers the Qpid resource manager with DTC, the plugin is loaded and a successful // connection via xa_open is confirmed before completing registration and saving the DSN // connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to -// restablish a new connection with the broker and perform recovery. +// re-establish a new connection with the broker and perform recovery. // // Because this plugin is not involved in coordinating any active transactions it only needs to // partially implement the XA interface. @@ -71,12 +71,19 @@ private: bool active; std::string host; int port; + std::string username; + std::string password; + bool ssl; + bool saslPlain; + int rmid; std::vector inDoubtXids; // current scan position, or -1 if no scan int cursor; public: - ResourceManager(int id, std::string h, int p) : rmid(id), host(h), port(p), active(false), cursor(-1) {} + ResourceManager(int id, std::string h, int p, bool sslP, bool saslPlainP, std::string uname, std::string pass) + : rmid(id), host(h), port(p), ssl(sslP), saslPlain(saslPlainP), username(uname), password(pass), + active(false), cursor(-1) {} ~ResourceManager() {} INT open(); INT close(); @@ -94,6 +101,7 @@ bool memLocked = false; #define QPIDHMCHARS 512 + void pinDll() { if (!memLocked) { char thisDllName[QPIDHMCHARS]; @@ -141,16 +149,53 @@ void QpidToXa(Xid &qpidXid, XID &winXid) { } -/* parse string from AmqpConnection.h +static char *dsnHeader = "QPIDdsnV2"; - this info will eventually include authentication tokens +const char* nextDot(const char *p) { + while (*p && (*p != '.')) + p++; + return p; +} - dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host, - System::Diagnostics::Process::GetCurrentProcess()->Id, - AppDomain::CurrentDomain->Id); -*/ +int getHexChar (char c) { + if ((c >= '0') && (c <= '9')) + return c - '0'; + + if ((c >= 'a') && (c <= 'f')) + return 10 + (c - 'a'); + + if ((c >= 'A') && (c <= 'F')) + return 10 + (c - 'A'); + + return -1; +} + +bool parseFromHex(const char* start, const char* end, std::string& target) +{ + const char *p = start; + + while ((p + 1) < end) { + int nibble = getHexChar(*p++); + if (nibble < 0) + return false; + int byte = (nibble << 4); + nibble = getHexChar(*p++); + if (nibble < 0) + return false; + byte += nibble; + target.append (1, (char) byte & 0xFF); + } + return (p == end); +} -bool parseDsn (const char *dsn, std::string& host, int& port) { + +// parse string from AmqpConnection::DataSourcename +// "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password" +// +// parse strictly and return false if the dsn is in a bad format + +bool parseDsn (const char *dsn, std::string& host, int& port, bool& ssl, bool& saslPlain, + std::string& username, std::string& password) { if (dsn == NULL) return false; @@ -158,45 +203,125 @@ bool parseDsn (const char *dsn, std::string& host, int& port) { if (len > 1024) return false; - int firstDot = 0; - for (int i = 0; i < len; i++) - if (dsn[i] == '.') { - firstDot = i; - break; - } - if (!firstDot) + if (strncmp(dsn, dsnHeader, strlen(dsnHeader))) return false; - // look for 2 dots side by side to indicate end of the host - int doubleDot = 0; - for (int i = firstDot + 1; i < (len - 1); i++) - if ((dsn[i] == '.') && (dsn[i+1] == '.')) { - doubleDot = i; - break; - } - if (!doubleDot) + const char *endp = dsn + len; + const char *tokenp = dsn + strlen(dsnHeader); + if (*tokenp != '.') + return false; + + // port + tokenp++; + if (tokenp >= endp) + return false; + if (*tokenp == '.') + return false; // null port not allowed + + const char *token_end = nextDot(tokenp); + if ((token_end - tokenp) > 5) return false; port = 0; - for (int i = 0; i < firstDot; i++) { - char c = dsn[i]; - if ((c < '0') || (c > '9')) + for (const char *p = tokenp; p < token_end; p++) { + if ((*p < '0') || (*p > '9')) + return false; + port = (10 * port) + (*p - '0'); + } + + if (port > 65535) + return false; + + // host + tokenp = token_end + 1; + if (tokenp >= endp) + return false; + if (*tokenp == '.') + return false; // null host not allowed + + token_end = nextDot(tokenp); + if (!parseFromHex(tokenp, token_end, host)) + return false; + + // skip the RM identifier, but verify it exists + tokenp = token_end + 1; + if (tokenp >= endp) + return false; + token_end = nextDot (tokenp); + if ((token_end - tokenp) < 3) + return false; + + // ssl: look for T or F + tokenp = token_end + 1; + if (tokenp >= endp) + return false; + if (*tokenp == 'T') + ssl = true; + else if (*tokenp == 'F') + ssl = false; + else + return false; + if (*++tokenp != '.') + return false; + + // sasl mechanism: A = anonymous, P = plain. More to come... + ++tokenp; + if (tokenp >= endp) + return false; + if (*(tokenp+1) != '.') + return false; + + if (*tokenp == 'A') { + saslPlain = false; + tokenp += 2; + // no auth tokens + } + else if (*tokenp == 'P') { + saslPlain = true; + tokenp += 2; + if (tokenp >= endp) return false; - port = (10 * port) + (c - '0'); + token_end = nextDot (tokenp); + if (!parseFromHex(tokenp, token_end, username)) + return false; + tokenp = token_end + 1; + + if (tokenp >= endp) + return false; + token_end = nextDot (tokenp); + if (!parseFromHex(tokenp, token_end, password)) + return false; + tokenp = token_end + 1; } + else + return false; - host.assign(dsn + firstDot + 1, (doubleDot - firstDot) - 1); - return true; + return (tokenp == endp); } + INT ResourceManager::open() { INT rv = XAER_RMERR; // placeholder until we successfully connect to resource active = true; LeaveCriticalSection(&rmLock); try { - qpidConnection.open(host, port); + ConnectionSettings settings; + settings.host = this->host; + settings.port = this->port; + + + if (ssl) + settings.protocol = "ssl"; + + if (saslPlain) { + settings.username = this->username; + settings.password = this->password; + settings.mechanism = "PLAIN"; + } + + qpidConnection.open(settings); qpidSession = qpidConnection.newSession(); rv = XA_OK; /* @@ -359,7 +484,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) { if (nXids > 0) { StructHelper decoder; Xid qpidXid; - for (int i = 0; i < nXids; i++) { + for (size_t i = 0; i < nXids; i++) { decoder.decode (qpidXid, wireFormatXids[i]); inDoubtXids.push_back(qpidXid); } @@ -369,7 +494,7 @@ INT ResourceManager::recover(XID *xids, long count, long flags) { // make sure none are too big, just in case - for (int i = 0; i < nXids; i++) { + for (size_t i = 0; i < nXids; i++) { Xid& xid = inDoubtXids[i]; size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0; size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0; @@ -449,10 +574,15 @@ INT __cdecl xa_open (char *xa_info, int rmid, long flags) { else { std::string brokerHost; int brokerPort; - if (parseDsn(xa_info, brokerHost, brokerPort)) { + std::string username; + std::string password; + bool ssl; + bool saslPlain; + + if (parseDsn(xa_info, brokerHost, brokerPort, ssl, saslPlain, username, password)) { try { - rmp = new ResourceManager(rmid, brokerHost, brokerPort); + rmp = new ResourceManager(rmid, brokerHost, brokerPort, ssl, saslPlain, username, password); rv = rmp->open(); if (rv != XA_OK) { diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp index c3afdf2280..1bc9a15d92 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp @@ -47,10 +47,9 @@ using namespace qpid::client; using namespace std; -// Note on locks: Use "this" for fast counting and idle/busy +// Note on locks: Use thisLock for fast counting and idle/busy // notifications. Use the "sessions" list to serialize session // creation/reaping and overall tear down. -// TODO: switch "this" lock to separate non-visible Object. AmqpConnection::AmqpConnection(String^ server, int port) : @@ -58,19 +57,65 @@ AmqpConnection::AmqpConnection(String^ server, int port) : busyCount(0), disposed(false) { + initialize (server, port, false, false, nullptr, nullptr); +} + +AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) : + connectionp(NULL), + busyCount(0), + disposed(false) +{ + initialize (server, port, ssl, saslPlain, username, password); +} + +void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) +{ + if (server == nullptr) + throw gcnew ArgumentNullException("AMQP server"); + if (saslPlain) { + if (username == nullptr) + throw gcnew ArgumentNullException("username"); + if (username == nullptr) + throw gcnew ArgumentNullException("password"); + } + bool success = false; System::Exception^ openException = nullptr; sessions = gcnew Collections::Generic::List(); + thisLock = gcnew Object(); try { connectionp = new Connection; - connectionp->open (QpidMarshal::ToNative(server), port); + + if (ssl || saslPlain) { + ConnectionSettings proposedSettings; + proposedSettings.host = QpidMarshal::ToNative(server); + proposedSettings.port = port; + if (ssl) + proposedSettings.protocol = "ssl"; + + if (saslPlain) { + proposedSettings.username = QpidMarshal::ToNative(username); + proposedSettings.password = QpidMarshal::ToNative(password); + proposedSettings.mechanism = "PLAIN"; + } + + connectionp->open (proposedSettings); + } + else { + connectionp->open (QpidMarshal::ToNative(server), port); + } + // TODO: registerFailureCallback for failover success = true; const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); this->maxFrameSize = settings.maxFrameSize; this->host = server; this->port = port; + this->ssl = ssl; + this->saslPlain = saslPlain; + this->username = username; + this->password = password; this->isOpen = true; } catch (const qpid::Exception& error) { String^ errmsg = gcnew String(error.what()); @@ -89,7 +134,7 @@ AmqpConnection::AmqpConnection(String^ server, int port) : AmqpConnection^ AmqpConnection::Clone() { if (disposed) throw gcnew ObjectDisposedException("AmqpConnection.Clone"); - return gcnew AmqpConnection (this->host, this->port); + return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password); } void AmqpConnection::Cleanup() @@ -153,7 +198,7 @@ void AmqpConnection::NotifyBusy() { bool changed = false; { - lock l(this); + lock l(thisLock); if (busyCount++ == 0) changed = true; } @@ -166,7 +211,7 @@ void AmqpConnection::NotifyIdle() { bool connectionIdle = false; { - lock l(this); + lock l(thisLock); if (--busyCount == 0) connectionIdle = true; } @@ -175,5 +220,57 @@ void AmqpConnection::NotifyIdle() } } +void HexAppend(StringBuilder^ sb, String^ s) { + if (s->Length > 0) { + array^ bytes = Encoding::UTF8->GetBytes(s); + for each (unsigned char b in bytes) { + sb->Append(String::Format("{0:x2}", b)); + } + } + sb->Append("."); +} + + +// Note: any change to this format has to be reflected in the DTC plugin's xa_open() +// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password" +// This extended info is needed so that the DTC can make a separate connection to the broker +// for recovery. + +String^ AmqpConnection::DataSourceName::get() { + if (dataSourceName == nullptr) { + StringBuilder^ sb = gcnew StringBuilder(); + sb->Append("QPIDdsnV2."); + + sb->Append(this->port); + sb->Append("."); + + HexAppend(sb, this->host); + + sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id); + sb->Append("-"); + sb->Append(AppDomain::CurrentDomain->Id); + sb->Append("."); + + if (this->ssl) + sb->Append("T"); + else + sb->Append("F"); + sb->Append("."); + + if (this->saslPlain) { + sb->Append("P."); + HexAppend(sb, this->username); + HexAppend(sb, this->password); + } + else { + // SASL anonymous + sb->Append("A."); + } + + dataSourceName = sb->ToString(); + } + return dataSourceName; +} + }}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h index 6533185fa1..ef4d0e3f37 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h @@ -36,17 +36,26 @@ public ref class AmqpConnection { private: Connection* connectionp; - String^ host; - int port; bool disposed; Collections::Generic::List^ sessions; bool isOpen; int busyCount; int maxFrameSize; DtxResourceManager^ dtxResourceManager; - void Cleanup(); // unique string used for distributed transactions String^ dataSourceName; + Object ^thisLock; + + // properties needed to allow DTC to do transactions (see DataSourceName + String^ host; + int port; + bool ssl; + bool saslPlain; + String^ username; + String^ password; + + void Cleanup(); + void initialize (System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); internal: void NotifyBusy(); @@ -63,19 +72,12 @@ private: } property String^ DataSourceName { - // Note: any change to this format has to be reflected in the DTC plugin's xa_open() - String^ get() { - if (dataSourceName == nullptr) { - dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host, - System::Diagnostics::Process::GetCurrentProcess()->Id, - AppDomain::CurrentDomain->Id); - } - return dataSourceName; - } + String^ get(); } public: AmqpConnection(System::String^ server, int port); + AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password); ~AmqpConnection(); !AmqpConnection(); void Close(); diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs index 992d6e9bd2..39b122f3d4 100644 --- a/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs @@ -44,10 +44,14 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest public class QueueChannelFactory { private static AmqpBinding brokerBinding; + private static BindingParameterCollection bindingParameters; private static IChannelFactory readerFactory; private static IChannelFactory writerFactory; private static string brokerAddr = "127.0.0.1"; private static int brokerPort = 5672; + private static string userName; + private static string password; + private static bool ssl = false; public static void SetBroker(string addr, int port) { @@ -55,14 +59,43 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest brokerPort = port; } + public static void SetSecurity(bool sslMode, string name, string pass) + { + ssl = sslMode; + if (name != null) + { + userName = name; + password = pass; + } + } + private static void InitializeBinding() { AmqpBinaryBinding binding = new AmqpBinaryBinding(); + bindingParameters = new BindingParameterCollection(); + binding.BrokerHost = brokerAddr; binding.BrokerPort = brokerPort; binding.TransferMode = TransferMode.Streamed; binding.PrefetchLimit = 5000; binding.Shared = true; + + if (ssl || (userName != null)) + { + binding.Security.Mode = AmqpSecurityMode.Transport; + binding.Security.Transport.UseSSL = ssl; + + if (userName != null) + { + binding.Security.Transport.CredentialType = AmqpCredentialType.Plain; + + ClientCredentials credentials = new ClientCredentials(); + credentials.UserName.UserName = userName; + credentials.UserName.Password = password; + bindingParameters.Add(credentials); + } + } + brokerBinding = binding; } @@ -77,7 +110,7 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest if (readerFactory == null) { - readerFactory = brokerBinding.BuildChannelFactory(); + readerFactory = brokerBinding.BuildChannelFactory(bindingParameters); readerFactory.Open(); } @@ -100,7 +133,7 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest if (writerFactory == null) { - writerFactory = brokerBinding.BuildChannelFactory(); + writerFactory = brokerBinding.BuildChannelFactory(bindingParameters); writerFactory.Open(); } @@ -121,6 +154,12 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest InteropDemo } + public enum SaslMechanism + { + None, + Plain + } + public class Options { public string broker; @@ -132,6 +171,10 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest public int subTxSize; public int pubTxSize; public bool durable; + public bool ssl; + public string username; + public string password; + public SaslMechanism saslMechanism; public Options() { @@ -144,6 +187,10 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest this.pubTxSize = 0; this.subTxSize = 0; this.durable = false; + this.ssl = false; + this.username = null; + this.password = null; + this.saslMechanism = SaslMechanism.None; } public void Parse(string[] args) @@ -226,9 +273,64 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest this.durable = true; } } - + + else if (arg == "--protocol") + { + arg = args[++current]; + if (arg.Equals("ssl")) + { + this.ssl = true; + } + } + + else if (arg == "--username") + { + this.username = args[++current]; + } + + else if (arg == "--password") + { + this.password = args[++current]; + } + + else if (arg == "--mechanism") + { + arg = args[++current]; + if (arg.Equals("PLAIN", StringComparison.OrdinalIgnoreCase)) + { + this.saslMechanism = SaslMechanism.Plain; + } + } + + else + { + throw new ArgumentException(String.Format("unknown argument \"{0}\"", arg)); + } + current++; } + + if (this.saslMechanism == SaslMechanism.Plain) + { + // use guest/guest as defaults if neither is specified + if ((this.username == null) && (this.password == null)) + { + this.username = "guest"; + this.password = "guest"; + } + else + { + if (this.username == null) + { + this.username = ""; + } + if (this.password == null) + { + this.password = ""; + } + } + } + } } @@ -302,16 +404,30 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest Console.WriteLine("transaction resource manager ready"); } + + // demonstrate message exchange between WcfPerftest.exe and native + // C++ perftest.exe + static void InteropDemo(Options opts) { string perftest_cpp_exe = "perftest.exe"; - string commonArgs = String.Format(" --count {0} --size {1}", opts.messageCount, opts.messageSize); + string commonArgs = String.Format(" --count {0} --size {1} --broker {2} --port {3}", opts.messageCount, opts.messageSize, opts.broker, opts.port); if (opts.durable) { commonArgs += " --durable yes"; } + if (opts.ssl) + { + commonArgs += " --protocol ssl"; + } + + if (opts.saslMechanism == SaslMechanism.Plain) + { + commonArgs += String.Format(" --username {0} --password {1} --mechanism PLAIN", opts.username, opts.password); + } + Console.WriteLine("===== WCF Subscriber and C++ Publisher ====="); Process setup = new Process(); @@ -386,6 +502,7 @@ namespace Apache.Qpid.Test.Channel.WcfPerftest Options opts = new Options(); opts.Parse(mainArgs); QueueChannelFactory.SetBroker(opts.broker, opts.port); + QueueChannelFactory.SetSecurity(opts.ssl, opts.username, opts.password); WarmUpTransactionSubsystem(opts); -- cgit v1.2.1