diff options
| -rw-r--r-- | qpid/doc/book/src/Programming-In-Apache-Qpid.xml | 485 |
1 files changed, 469 insertions, 16 deletions
diff --git a/qpid/doc/book/src/Programming-In-Apache-Qpid.xml b/qpid/doc/book/src/Programming-In-Apache-Qpid.xml index 5fd47e68b4..93d9c4db54 100644 --- a/qpid/doc/book/src/Programming-In-Apache-Qpid.xml +++ b/qpid/doc/book/src/Programming-In-Apache-Qpid.xml @@ -1133,6 +1133,9 @@ options := map <section> <title>Logging</title> + <para>To simplify debugging, Qpid provides a logging facility + that prints out messaging events.</para> + <section> <title>Logging in C++</title> <para>The Qpidd broker and C++ clients can both use environment @@ -1473,8 +1476,8 @@ sender.send(message) </thead> <tbody> <row><entry>bool</entry><entry>bool</entry><entry>boolean</entry></row> - <row><entry>int</entry><entry>long</entry><entry>long</entry></row> - <row><entry>long</entry><entry>long</entry><entry>long</entry></row> + <row><entry>int</entry><entry>int64</entry><entry>long</entry></row> + <row><entry>long</entry><entry>int64</entry><entry>long</entry></row> <row><entry>float</entry><entry>double</entry><entry>double</entry></row> <row><entry>unicode</entry><entry>string</entry><entry>java.lang.String</entry></row> <row><entry>uuid</entry><entry>qpid::types::Uuid</entry><entry>java.util.UUID</entry></row> @@ -1565,14 +1568,12 @@ sender.send(message, true); </thead> <tbody> <row><entry>bool</entry><entry>bool</entry><entry>boolean</entry></row> - <row><entry>unsigned char</entry><entry>unicode</entry><entry>char</entry></row> - <row><entry>unsigned short int</entry><entry>int | long</entry><entry>short</entry></row> - <row><entry>unsigned int</entry><entry>int | long</entry><entry>int</entry></row> - <row><entry>unsigned long</entry><entry>int | long</entry><entry>long</entry></row> - <row><entry>char</entry><entry>unicode</entry><entry>char</entry></row> - <row><entry>short</entry><entry>int | long</entry><entry>short</entry></row> - <row><entry>int</entry><entry>int | long</entry><entry>int</entry></row> - <row><entry>long</entry><entry>int | long</entry><entry>long</entry></row> + <row><entry>uint16</entry><entry>int | long</entry><entry>short</entry></row> + <row><entry>uint32</entry><entry>int | long</entry><entry>int</entry></row> + <row><entry>uint64</entry><entry>int | long</entry><entry>long</entry></row> + <row><entry>int16</entry><entry>int | long</entry><entry>short</entry></row> + <row><entry>int32</entry><entry>int | long</entry><entry>int</entry></row> + <row><entry>int64</entry><entry>int | long</entry><entry>long</entry></row> <row><entry>float</entry><entry>float</entry><entry>float</entry></row> <row><entry>double</entry><entry>float</entry><entry>double</entry></row> <row><entry>string</entry><entry>unicode</entry><entry>java.lang.String</entry></row> @@ -1685,20 +1686,22 @@ sender.send(Message("Hello world!")); <para>Python:</para> --> </example> - +<!-- <section> <title>Cluster Failover #### </title> </section> +--> </section> </section> +<!-- <section> <title>Security ####</title> </section> - +--> <section> <title>Transactions</title> @@ -2696,10 +2699,9 @@ producer.send(m); </thead> <tbody> <row><entry>boolean</entry><entry>bool</entry><entry>bool</entry></row> - <row><entry>char</entry><entry>unicode<footnote><para>The Python string will contain one Unicode character</para></footnote></entry><entry>int</entry></row> - <row><entry>short</entry><entry>int | long</entry><entry>short</entry></row> - <row><entry>int</entry><entry>int | long</entry><entry>int</entry></row> - <row><entry>long</entry><entry>int | long</entry><entry>long</entry></row> + <row><entry>short</entry><entry>int | long</entry><entry>int16</entry></row> + <row><entry>int</entry><entry>int | long</entry><entry>int32</entry></row> + <row><entry>long</entry><entry>int | long</entry><entry>int64</entry></row> <row><entry>float</entry><entry>float</entry><entry>float</entry></row> <row><entry>double</entry><entry>float</entry><entry>double</entry></row> <row><entry>java.lang.String</entry><entry>unicode</entry><entry>std::string</entry></row> @@ -2814,6 +2816,457 @@ variable := ( <ID> | <QUOTED_ID> )]]></programlisting> </section> </chapter> + + <chapter id="QpidWCF"> + <title>Using the Qpid WCF client</title> + <section> + <title>XML and Binary Bindings</title> + + <para>The Qpid WCF client provides two bindings, each with support for + Windows .NET transactions.</para> + + <para>The AmqpBinding is suitable for communication between two WCF + applications. By default it uses the WCF binary .NET XML encoder + (BinaryMessageEncodingBindingElement) for efficient message + transmission, but it can also use the text and Message Transmission + Optimization Mechanism (MTOM) encoders. Here is a traditional service + model sample program using the AmqpBinding. It assumes that the queue + "hello_service_node" has been created and configured on the AMQP + broker.</para> + + <example> + <title>Traditional service model "Hello world!" example</title> + <programlisting><![CDATA[ +namespace Apache.Qpid.Documentation.HelloService +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + using Apache.Qpid.Channel; + + [ServiceContract] + public interface IHelloService + { + [OperationContract(IsOneWay = true, Action = "*")] + void SayHello(string greeting); + } + + public class HelloService : IHelloService + { + private static int greetingCount; + + public static int GreetingCount + { + get { return greetingCount; } + } + + public void SayHello(string greeting) + { + Console.WriteLine("Service received: " + greeting); + greetingCount++; + } + + static void Main(string[] args) + { + try + { + AmqpBinding amqpBinding = new AmqpBinding(); + amqpBinding.BrokerHost = "localhost"; + amqpBinding.BrokerPort = 5672; + + ServiceHost serviceHost = new ServiceHost(typeof(HelloService)); + serviceHost.AddServiceEndpoint(typeof(IHelloService), + amqpBinding, "amqp:hello_service_node"); + serviceHost.Open(); + + // Send the service a test greeting + Uri amqpClientUri = new Uri("amqp:amq.direct?routingkey=hello_service_node"); + EndpointAddress clientEndpoint = new EndpointAddress(amqpClientUri); + ChannelFactory<IHelloService> channelFactory = + new ChannelFactory<IHelloService>(amqpBinding, clientEndpoint); + IHelloService clientProxy = channelFactory.CreateChannel(); + + clientProxy.SayHello("Greetings from WCF client"); + + // wait for service to process the greeting + while (HelloService.GreetingCount == 0) + { + Thread.Sleep(100); + } + channelFactory.Close(); + serviceHost.Close(); + } + catch (Exception e) + { + Console.WriteLine("Exception: {0}", e); + } + } + } +} + ]]></programlisting> + </example> + + <para>The second binding, AmqpBinaryBinding, is suitable for WCF + applications that need to inter-operate with non-WCF clients or that + wish to have direct access to the raw wire representation of the + message body. It relies on a custom encoder to read and write raw + (binary) content which operates similarly to the ByteStream encoder + (introduced in .NET 4.0). The encoder presents an abstract XML + infoset view of the raw message content on input. On output, the + encoder does the reverse and peels away the XML infoset layer exposing + the raw content to the wire representation of the message body. The + application must do the inverse of what the encoder does to allow the + XML infoset wrapper to cancel properly. This is demonstrated in the + following sample code (using the channel programming model) which + directly manipulates or provides callbacks to the WCF message readers + and writers when the content is consumed. In contrast to the + AmqpBinding sample where the simple greeting is encapsulated in a + compressed SOAP envelope, the wire representation of the message + contains the raw content and is identical and fully interoperable with + the Qpid C++ "Hello world!" example.</para> + + <example> + <title>Binary "Hello world!" example using the channel model</title> + <programlisting><![CDATA[ +namespace Apache.Qpid.Samples.Channel.HelloWorld +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using System.Text; + using System.Xml; + using Apache.Qpid.Channel; + + public class HelloWorld + { + static void Main(string[] args) + { + String broker = "localhost"; + int port = 5672; + String target = "amq.topic"; + String source = "my_topic_node"; + + if (args.Length > 0) + { + broker = args[0]; + } + + if (args.Length > 1) + { + port = int.Parse(args[1]); + } + + if (args.Length > 2) + { + target = args[2]; + } + + if (args.Length > 3) + { + source = args[3]; + } + + AmqpBinaryBinding binding = new AmqpBinaryBinding(); + binding.BrokerHost = broker; + binding.BrokerPort = port; + + IChannelFactory<IInputChannel> receiverFactory = binding.BuildChannelFactory<IInputChannel>(); + receiverFactory.Open(); + IInputChannel receiver = receiverFactory.CreateChannel(new EndpointAddress("amqp:" + source)); + receiver.Open(); + + IChannelFactory<IOutputChannel> senderFactory = binding.BuildChannelFactory<IOutputChannel>(); + senderFactory.Open(); + IOutputChannel sender = senderFactory.CreateChannel(new EndpointAddress("amqp:" + target)); + sender.Open(); + + sender.Send(Message.CreateMessage(MessageVersion.None, "", new HelloWorldBinaryBodyWriter())); + + Message message = receiver.Receive(); + XmlDictionaryReader reader = message.GetReaderAtBodyContents(); + while (!reader.HasValue) + { + reader.Read(); + } + + byte[] binaryContent = reader.ReadContentAsBase64(); + string text = Encoding.UTF8.GetString(binaryContent); + + Console.WriteLine(text); + + senderFactory.Close(); + receiverFactory.Close(); + } + } + + public class HelloWorldBinaryBodyWriter : BodyWriter + { + public HelloWorldBinaryBodyWriter() : base (true) {} + + protected override void OnWriteBodyContents(XmlDictionaryWriter writer) + { + byte[] binaryContent = Encoding.UTF8.GetBytes("Hello world!"); + + // wrap the content: + writer.WriteStartElement("Binary"); + writer.WriteBase64(binaryContent, 0, binaryContent.Length); + } + } +} +]]></programlisting> + </example> + + <para>Bindings define ChannelFactories and ChannelListeners associated with + an AMQP Broker. WCF will frequently automatically create and manage + the life cycle of a these and the resulting IChannel objects used in + message transfer. The binding parameters that can be set are:</para> + + <table pgwide="1"> + <title>WCF Binding Parameters</title> + <tgroup cols="3"> + <thead> + <colspec colnum="1" colwidth="1*"/> + <colspec colnum="2" colwidth="3*"/> + <colspec colnum="3" colwidth="3*"/> + <row> + <entry>Parameter</entry> + <entry>Default</entry> + <entry>Description</entry> + </row> + </thead> + <tbody> + <row> + <entry> + BrokerHost + </entry> + <entry> + localhost + </entry> + <entry> + The broker's server name. Currently the WCF channel + only supports connections with a single broker. + Failover to multiple brokers will be provided in the + future. + </entry> + </row> + + <row> + <entry> + BrokerPort + </entry> + <entry> + 5672 + </entry> + <entry> + The port the broker is listening on. + </entry> + </row> + + <row> + <entry> + PrefetchLimit + </entry> + <entry> + 0 + </entry> + <entry> + The number of messages to prefetch from the amqp + broker before the application actually consumes them. + Increasing this number can dramatically increase the + read performance in some circumstances. + </entry> + </row> + + <row> + <entry> + Shared + </entry> + <entry> + false + </entry> + <entry> + Indicates if separate channels to the same broker can + share an underlying AMQP tcp connection (provided they + also share the same authentication credentials). + </entry> + </row> + + <row> + <entry> + TransferMode + </entry> + <entry> + buffered + </entry> + <entry> + Indicates whether the channel's encoder uses the WCF + BufferManager cache to temporarily store message + content during the encoding/decoding phase. For small + to medium sized SOAP based messages, buffered is + usually the preferred choice. For binary messages, + streamed TransferMode is the more efficient mode. + </entry> + </row> + </tbody> + </tgroup> + </table> + </section> + + <section> + <title>Endpoints</title> + + <para>In Qpid 0.6 the WCF Endpoints map to simple AMQP 0-10 + exchanges (IOutputChannel) or AMQP 0-10 queues (IInputChannel). + The format for an IOutputChannel is</para> + + <programlisting><![CDATA[ + "amqp:amq.direct" or "amqp:my_exchange?routingkey=my_routing_key" +]]></programlisting> + + <para>and for an IInputChannel is</para> + + <programlisting><![CDATA[ + "amqp:my_queue" +]]></programlisting> + + <para>The routing key is in fact a default value associated with + the particular channel. Outgoing messages can always have their + routing key uniquely set.</para> + + <para>If the respective queue or exchange doesn't exist, an exception + is thrown when opening the channel. Queues and exchanges can be + created and configured using qpid-config.</para> + + </section> + + <section> + <title>Message Headers</title> + + <para>AMQP specific message headers can be set on or retrieved + from the ServiceModel.Channels.Message using the AmqpProperties + type.</para> + + <para>For example, on output:</para> + + <programlisting><![CDATA[ + AmqpProperties props = new AmqpProperties(); + props.Durable = true; + props.PropertyMap.Add("my_custom_header", new AmqpString("a custom value")); + Message msg = Message.CreateMessage(args); + msg.Properties.Add("AmqpProperties", amqpProperties); + outputChannel.Send(msg); +]]></programlisting> + + <para>On input the headers can be accessed from the Message or extracted + from the operation context</para> + + <programlisting><![CDATA[ + public void SayHello(string greeting) + { + AmqpProperties props = (AmqpProperties) OperationContext. + Current.IncomingMessageProperties["AmqpProperties"]; + AmqpString extra = (AmqpString) props.PropertyMap["my_custom_header"]; + Console.WriteLine("Service received: {0} and {1}", greeting, extra); + } +]]></programlisting> + + </section> + + <section> + <title>Security</title> + + <para>To engage TLS/SSL:</para> + + <programlisting><![CDATA[ + binding.Security.Mode = AmqpSecurityMode.Transport; + binding.Security.Transport.UseSSL = true; + binding.BrokerPort = 5671; +]]></programlisting> + + <para>Currently the WCF client only provides SASL PLAIN (i.e. username and + password) authentication. To provide a username and password, you can + set the DefaultAmqpCredential value in the binding. This value can be + overridden or set for a binding's channel factories and listeners, + either by setting the ClientCredentials as a binding parameter, or by + using an AmqpCredential as a binding parameter. The search order for + credentials is the AmqpCredential binding parameter, followed by the + ClientCredentials (unless IgnoreEndpointClientCredentials has been + set), and finally defaulting to the DefaultAmqpCredential of the + binding itself. Here is a sample using ClientCredentials:</para> + + <programlisting><![CDATA[ + ClientCredentials credentials = new ClientCredentials(); + credentials.UserName.UserName = "guest"; + credentials.UserName.Password = "guest"; + bindingParameters = new BindingParameterCollection(); + bindingParameters.Add(credentials); + readerFactory = binding.BuildChannelFactory<IInputChannel>(bindingParameters); +]]></programlisting> + + </section> + + <section> + <title>Transactions</title> + + <para>The WCF channel provides a transaction resource manager + module and a recovery module that together provide distributed + transaction support with one-phase optimization. Some + configuration is required on Windows machines to enable + transaction support (see your installation notes or top level + ReadMe.txt file for instructions). Once properly configured, + the Qpid WCF channel acts as any other System.Transactions aware + resource, capable of participating in explicit or implicit + transactions.</para> + + <para>Server code:</para> + + <programlisting><![CDATA[ + [OperationBehavior(TransactionScopeRequired = true, + TransactionAutoComplete = true)] + public void SayHello(string greeting) + { + // increment ExactlyOnceReceived counter on DB + + // Success: transaction auto completes: + } +]]></programlisting> + + <para>Because this operation involves two transaction resources, the + database and the AMQP message broker, this operates as a full two + phase commit transaction managed by the Distributed Transaction + Coordinator service. If the transaction proceeds without error, + both ExactlyOnceReceived is incremented in the database and the AMQP + message is consumed from the broker. Otherwise, ExactlyOnceReceived is + unchanged and AMQP message is returned to its queue on the broker.</para> + + <para>For the client code a few changes are made to the non-transacted + example. For "exactly once" semantics, we set the AMQP "Durable" + message property and enclose the transacted activities in a + TransactionScope:</para> + + <programlisting><![CDATA[ + AmqpProperties myDefaults = new AmqpProperties(); + myDefaults.Durable = true; + amqpBinding.DefaultMessageProperties = myDefaults; + ChannelFactory<IHelloService> channelFactory = + new ChannelFactory<IHelloService>(amqpBinding, clientEndpoint); + IHelloService clientProxy = channelFactory.CreateChannel(); + + using (TransactionScope ts = new TransactionScope()) + { + AmqpProperties amqpProperties = new AmqpProperties(); + clientProxy.SayHello("Greetings from WCF client"); + // increment ExactlyOnceSent counter on DB + + ts.Complete(); + } +]]></programlisting> + + </section> + </chapter> + </book> <!-- |
