diff options
| author | Clifford Allan Jansen <cliffjansen@apache.org> | 2010-10-24 23:29:37 +0000 |
|---|---|---|
| committer | Clifford Allan Jansen <cliffjansen@apache.org> | 2010-10-24 23:29:37 +0000 |
| commit | f86e9093784d583f9af5068cbb5cdbc9a0f90e19 (patch) | |
| tree | f6ce582ef2e06f9375429cf90936360ce33e81e5 /qpid/wcf/src/Apache/Qpid | |
| parent | b3fba89268a91558b7d5012bdc9a14f8385c89a5 (diff) | |
| download | qpid-python-f86e9093784d583f9af5068cbb5cdbc9a0f90e19.tar.gz | |
QPID-2646 patches
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1026915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid')
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs | 23 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs | 113 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj | 1 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp | 29 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h | 6 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp | 20 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/InputLink.h | 8 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj | 8 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp | 12 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h | 13 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp | 304 | ||||
| -rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h | 89 |
12 files changed, 572 insertions, 54 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs index 0f649dcd36..4099571fe0 100644 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs @@ -29,7 +29,7 @@ namespace Apache.Qpid.AmqpTypes // AMQP 0-10 delivery properties private bool durable; private Nullable<TimeSpan> timeToLive; - private string routingKey; + private string subject; // AMQP 0-10 message properties private string replyToExchange; @@ -50,7 +50,7 @@ namespace Apache.Qpid.AmqpTypes { get { - return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue); + return ((this.subject != null) || this.durable || this.timeToLive.HasValue); } } @@ -163,10 +163,19 @@ namespace Apache.Qpid.AmqpTypes set { this.timeToLive = value; } } + /// <summary> + /// Obsolete: switch to AMQP 1.0 "Subject" naming + /// </summary> public string RoutingKey { - get { return this.routingKey; } - set { this.routingKey = value; } + get { return this.subject; } + set { this.subject = value; } + } + + public string Subject + { + get { return this.subject; } + set { this.subject = value; } } public string ReplyToExchange @@ -200,7 +209,7 @@ namespace Apache.Qpid.AmqpTypes public void Clear() { this.timeToLive = null; - this.routingKey = null; + this.subject = null; this.replyToRoutingKey = null; this.replyToExchange = null; this.durable = false; @@ -251,9 +260,9 @@ namespace Apache.Qpid.AmqpTypes this.replyToRoutingKey = other.replyToRoutingKey; } - if (other.routingKey != null) + if (other.subject != null) { - this.routingKey = other.routingKey; + this.subject = other.subject; } if (other.durable) diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs index a6f6ee6800..6f0ffd9815 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel using System.Text; using System.Threading; using System.Globalization; + using System.Web; using System.Xml; // the thin interop layer that provides access to the Qpid AMQP client libraries @@ -52,11 +53,11 @@ namespace Apache.Qpid.Channel private bool shared; private int prefetchLimit; private string encoderContentType; + // AMQP subject/routing key + private string subject; + // Qpid addressing value for "qpid.subject" property + private string qpidSubject; - // input = 0-10 queue, output = 0-10 exchange - private string queueName; - - private String routingKey; private BufferManager bufferManager; private AmqpProperties outputMessageProperties; @@ -85,7 +86,7 @@ namespace Apache.Qpid.Channel this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments - this.ParseAmqpUri(remoteAddress.Uri); + string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject); this.encoder = msgEncoder; string ct = String.Empty; @@ -129,12 +130,14 @@ namespace Apache.Qpid.Channel if (this.isInputChannel) { - this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress); this.inputLink.PrefetchLimit = this.prefetchLimit; } else { - this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress); + this.subject = this.outputLink.DefaultSubject; + this.qpidSubject = this.outputLink.QpidSubject; } } @@ -423,9 +426,14 @@ namespace Apache.Qpid.Channel outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); } - if (this.routingKey != null) + if (this.subject != null) { - outgoingProperties.RoutingKey = this.routingKey; + outgoingProperties.RoutingKey = this.subject; + } + + if (this.qpidSubject != null) + { + outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject); } // Add the Properties set by the application on this particular message. @@ -544,8 +552,7 @@ namespace Apache.Qpid.Channel this.bufferManager.Clear(); } - // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key" - private void ParseAmqpUri(Uri uri) + private string UriToQpidAddress(Uri uri, out string subject) { if (uri.Scheme != AmqpConstants.Scheme) { @@ -553,43 +560,83 @@ namespace Apache.Qpid.Channel "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); } - this.queueName = uri.LocalPath; + subject = ""; + string path = uri.LocalPath; + string query = uri.Query; + + // legacy... convert old style myqueue?routingkey=key to myqueue/key - if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel) + if (query.Length > 0) { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri"); - } + if (!query.StartsWith("?")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument."), "uri"); + } - // search out session parameters in the query portion of the URI + string routingParseKey = "routingkey="; + string subjectParseKey = "subject="; + char[] charSeparators = new char[] { '?', ';' }; + string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); + foreach (string s in args) + { + if (s.StartsWith(routingParseKey)) + { + subject = s.Substring(routingParseKey.Length); + } + else if (s.StartsWith(subjectParseKey)) + { + subject = s.Substring(subjectParseKey.Length); + } + else + { + if (s.Length > 0) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument {0}.", s), "uri"); + } + } + } - string routingParseKey = "routingkey="; - char[] charSeparators = new char[] { '?', ';' }; - string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); - foreach (string s in args) - { - if (s.StartsWith(routingParseKey)) + if (path.Contains("/")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid queue name {0}.", path), "uri"); + } + + if (path.Length == 0) { - this.routingKey = s.Substring(routingParseKey.Length); + // special case, user wants default exchange + return "//" + subject; } + + return path + "/" + subject; } - if (this.queueName == String.Empty) + // find subject in "myqueue/mysubject;{mode:browse}" + int pos = path.IndexOf('/'); + if ((pos > -1) && (pos < path.Length + 1)) { - if (this.isInputChannel) + subject = path.Substring(pos); + pos = subject.IndexOf(';'); + if (pos == 0) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Empty queue target specifier not allowed."), "uri"); + "Empty subject in address {0}.", path), "uri"); } - else + + if (pos > 0) { - if (this.routingKey == null) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "No target queue or routing key specified."), "uri"); - } + subject = subject.Substring(0, pos); } } + + if (subject.Length > 0) + { + subject = HttpUtility.UrlDecode(subject); + } + + return HttpUtility.UrlDecode(path); } } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj index 6bb059daf6..1eb811b425 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -97,6 +97,7 @@ under the License. <RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
<Reference Include="System.Transactions" />
+ <Reference Include="System.Web" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp index d2adb41205..ac7c777d1f 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -357,17 +357,20 @@ bool AmqpSession::MessageStop(std::string &name) return true; } -void AmqpSession::AcceptAndComplete(SequenceSet& transfers) +void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing) { lock l(sessionLock); - // delimit with session dtx commands depending on the transaction context - UpdateTransactionState(%l); + if (!browsing) { + // delimit with session dtx commands depending on the transaction context + UpdateTransactionState(%l); + } CheckOpen(); sessionp->markCompleted(transfers, false); - sessionp->messageAccept(transfers, false); + if (!browsing) + sessionp->messageAccept(transfers, false); } @@ -609,4 +612,22 @@ void AmqpSession::ReleaseCompletion(IntPtr completion) { delete completion.ToPointer(); } + +// Non-exclusive borrowing for a "brief" period. I.e. several synced +// commands (address resolution) + +IntPtr AmqpSession::BorrowNativeSession() { + lock l(sessionLock); + if (closing) + return IntPtr::Zero; + + IncrementSyncs(); + return (IntPtr) sessionp; +} + +void AmqpSession::ReturnNativeSession() { + lock l(sessionLock); + DecrementSyncs(); +} + }}} // namespace Apache::Qpid::Cli diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h index 88ffd18dcc..7a49496805 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -78,7 +78,7 @@ public: OutputLink^ CreateOutputLink(System::String^ targetQueue); InputLink^ CreateInputLink(System::String^ sourceQueue); - // 0-10 specific support + // 0-10 specific support; deprecated in favor of Qpid messaging addresses InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange); void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey); @@ -90,7 +90,7 @@ internal: void internalWaitForCompletion(IntPtr future); void removeWaiter(CompletionWaiter^ waiter); bool MessageStop(std::string &name); - void AcceptAndComplete(SequenceSet& transfers); + void AcceptAndComplete(SequenceSet& transfers, bool browsing); IntPtr BeginPhase0Flush(XaTransaction^); void EndPhase0Flush(XaTransaction^, IntPtr); IntPtr DtxStart(IntPtr xidp, bool, bool); @@ -98,6 +98,8 @@ internal: IntPtr DtxCommit(IntPtr xidp, bool onePhase); IntPtr DtxRollback(IntPtr xidp); void ReleaseCompletion(IntPtr completion); + IntPtr BorrowNativeSession(); + void ReturnNativeSession(); property AmqpConnection^ Connection { AmqpConnection^ get () { return connection; } diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp index 3245cd3540..2b0119e338 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -88,9 +88,12 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, waiters = gcnew Collections::Generic::List<MessageWaiter^>(); linkLock = waiters; // private and available subscriptionLock = gcnew Object(); + qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); + qpidAddress->ResolveLink(session); + browsing = qpidAddress->Browsing; try { - std::string qname = QpidMarshal::ToNative(sourceQueue); + std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); if (temporary) { qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); @@ -104,6 +107,15 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, settings.flowControl = FlowControl::messageCredit(0); settings.completionMode = CompletionMode::MANUAL_COMPLETION; + if (browsing) { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; + } + else { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; + } + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup @@ -186,8 +198,10 @@ void InputLink::Cleanup() { ReleaseNative(); } - } + + // Now that subscription is torn down, we can execute pending delete on remote node + qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } @@ -699,7 +713,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) // subscriptionp->accept(frameSetID) is a slow sync operation in the native API // so do it within the AsyncSession directly - amqpSession->AcceptAndComplete(frameSetID); + amqpSession->AcceptAndComplete(frameSetID, browsing); workingCredit--; // check if more messages need to be requested from broker diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h index 2f96b91944..136d53d280 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -20,6 +20,7 @@ #pragma once #include "MessageWaiter.h" +#include "QpidAddress.h" namespace Apache { namespace Qpid { @@ -58,6 +59,9 @@ private: // working credit low water mark int minWorkingCredit; + bool browsing; + QpidAddress^ qpidAddress; + void Cleanup(); void ReleaseNative(); bool haveMessage(); @@ -97,6 +101,10 @@ public: void set (int value); } + property bool Browsing { + bool get () { return browsing; } + } + }; }}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj index 2056c97d57..fe288cbe76 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -409,6 +409,10 @@ >
</File>
<File
+ RelativePath=".\QpidAddress.cpp"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.cpp"
>
</File>
@@ -455,6 +459,10 @@ >
</File>
<File
+ RelativePath=".\QpidAddress.h"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.h"
>
</File>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp index 27725b8207..de7141dadb 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp @@ -48,13 +48,14 @@ using namespace std; using namespace Apache::Qpid::AmqpTypes; -OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) : +OutputLink::OutputLink(AmqpSession^ session, String^ address) : amqpSession(session), - queue(defaultQueue), disposed(false), maxFrameSize(session->Connection->MaxFrameSize), finalizing(false) { + qpidAddress = QpidAddress::CreateAddress(address, false); + qpidAddress->ResolveLink(session); } void OutputLink::Cleanup() @@ -67,6 +68,8 @@ void OutputLink::Cleanup() disposed = true; } + // process any pending queue delete + qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } @@ -217,7 +220,8 @@ void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr); + CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, + timeout, false, nullptr, nullptr); if (waiter != nullptr) { waiter->WaitForCompletion(); @@ -234,7 +238,7 @@ IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state); + CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state); return waiter; } diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h index 1f049a7412..e30d1cc79f 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h @@ -19,6 +19,8 @@ #pragma once +#include "QpidAddress.h" + namespace Apache { namespace Qpid { namespace Interop { @@ -34,7 +36,7 @@ public ref class OutputLink { private: AmqpSession^ amqpSession; - String^ queue; + QpidAddress^ qpidAddress; bool disposed; bool finalizing; void Cleanup(); @@ -58,6 +60,15 @@ public: AmqpTypes::AmqpProperties^ get () { return defaultProperties; } void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; } } + + property String^ DefaultSubject { + String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; } + } + + property String^ QpidSubject { + String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; } + } + }; diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp new file mode 100644 index 0000000000..bfae1ab313 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp @@ -0,0 +1,304 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +/* + * This program parses strings of the form "node/subject;{options}" as + * used in the Qpid messaging API. It provides basic wiring + * capabilities to create/delete temporary queues (to topic + * subsciptions) and unbound "point and shoot" queues. + */ + + +#include <windows.h> +#include <msclr\lock.h> +#include <oletx2xa.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/Message.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/client/Future.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "MessageBodyStream.h" +#include "InputLink.h" +#include "OutputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" +#include "QpidAddress.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + +QpidAddress::QpidAddress(String^ s, bool isInput) { + address = s; + nodeName = s; + isInputChannel = isInput; + isQueue = true; + + if (address->StartsWith("//")) { + // special case old style address to default exchange, + // no options, output only + if ((s->IndexOf(';') != -1) || isInputChannel) + throw gcnew ArgumentException("Invalid 0-10 address: " + address); + nodeName = nodeName->Substring(2); + return; + } + + String^ options = nullptr; + int pos = s->IndexOf(';'); + if (pos != -1) { + options = s->Substring(pos + 1); + nodeName = s->Substring(0, pos); + + if (options->Length > 0) { + if (!options->StartsWith("{") || !options->EndsWith("}")) + throw gcnew ArgumentException("Invalid address: " + address); + options = options->Substring(1, options->Length - 2); + array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries); + + if ((subOpts->Length % 2) != 0) + throw gcnew ArgumentException("Bad address (options): " + address); + + for (int i=0; i < subOpts->Length; i += 2) { + String^ opt = subOpts[i]; + String^ optArg = subOpts[i+1]; + if (opt->Equals("create")) { + creating = PolicyApplies(optArg); + } + else if (opt->Equals("delete")) { + deleting = PolicyApplies(optArg); + } + else if (opt->Equals("mode")) { + if (optArg->Equals("browse")) { + browsing = isInputChannel; + } + else if (!optArg->Equals("consume")) { + throw gcnew ArgumentException("Invalid browsing option: " + optArg); + } + } + else if (opt->Equals("assert") || opt->Equals("node")) { + throw gcnew ArgumentException("Unsupported address option: " + opt); + } + else { + throw gcnew ArgumentException("Bad address option: " + opt); + } + } + } + else + options = nullptr; + } + + pos = nodeName->IndexOf('/'); + if (pos != -1) { + subject = nodeName->Substring(pos + 1); + if (String::IsNullOrEmpty(subject)) + subject = nullptr; + nodeName = nodeName->Substring(0, pos); + } +} + + +QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) { + QpidAddress^ addr = gcnew QpidAddress(s, isInput); + return addr; +} + + +void QpidAddress::ResolveLink(AmqpSession^ amqpSession) { + + AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); + if (asyncSessionp == NULL) + throw gcnew ObjectDisposedException("session"); + + deleteName = nullptr; + isQueue = true; + + try { + Session session = sync(*asyncSessionp); + std::string n_name = QpidMarshal::ToNative(nodeName); + ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name); + + bool queueFound = !result.getQueueNotFound(); + bool exchangeFound = !result.getExchangeNotFound(); + + if (isInputChannel) { + + if (queueFound) { + linkName = nodeName; + if (deleting) + deleteName = nodeName; + } + else if (exchangeFound) { + isQueue = false; + String^ tmpkey = nullptr; + String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString(); + bool haveSubject = !String::IsNullOrEmpty(subject); + FieldTable bindArgs; + + std::string exchangeType = session.exchangeQuery(n_name).getType(); + if (exchangeType == "topic") { + tmpkey = haveSubject ? subject : "#"; + } + else if (exchangeType == "fanout") { + tmpkey = tmpname; + } + else if (exchangeType == "headers") { + tmpkey = haveSubject ? subject : "match-all"; + if (haveSubject) + bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject)); + bindArgs.setString("x-match", "all"); + } + else if (exchangeType == "xml") { + tmpkey = haveSubject ? subject : ""; + if (haveSubject) { + String^ v = "declare variable $qpid.subject external; $qpid.subject = '" + + subject + "'"; + bindArgs.setString("xquery", QpidMarshal::ToNative(v)); + } + else + bindArgs.setString("xquery", "true()"); + } + else { + tmpkey = haveSubject ? subject : ""; + } + + std::string qn = QpidMarshal::ToNative(tmpname); + session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true); + bool success = false; + try { + session.exchangeBind(arg::exchange=n_name, arg::queue=qn, + arg::bindingKey=QpidMarshal::ToNative(tmpkey), + arg::arguments=bindArgs); + bindKey = tmpkey; // remember for later cleanup + success = true; + } + finally { + if (!success) + session.queueDelete(arg::queue=qn); + } + linkName = tmpname; + deleteName = tmpname; + deleting = true; + } + else if (creating) { + // only create "point and shoot" queues for now + session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); + // leave unbound + + linkName = nodeName; + + if (deleting) + deleteName = nodeName; + } + else { + throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); + } + } + else { + // Output channel + + bool oldStyleUri = address->StartsWith("//"); + + if (queueFound) { + linkName = ""; // default exchange for point and shoot + routingKey = nodeName; + if (deleting) + deleteName = nodeName; + } + else if (exchangeFound && !oldStyleUri) { + isQueue = false; + linkName = nodeName; + routingKey = subject; + } + else if (creating) { + // only create "point and shoot" queues for now + session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); + // leave unbound + linkName = ""; + routingKey = nodeName; + if (deleting) + deleteName = nodeName; + } + else { + throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); + } + } + } + finally { + amqpSession->ReturnNativeSession(); + } +} + +void QpidAddress::CleanupLink(AmqpSession^ amqpSession) { + if (deleteName == nullptr) + return; + + AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); + if (asyncSessionp == NULL) { + // TODO: log it: can't undo tear down actions + return; + } + + try { + Session session = sync(*asyncSessionp); + std::string q = QpidMarshal::ToNative(deleteName); + if (isInputChannel && !isQueue) { + // undo the temp wiring to the topic + session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q, + arg::bindingKey=QpidMarshal::ToNative(bindKey)); + } + session.queueDelete(q); + } + catch (Exception^ e) { + // TODO: log it + } + finally { + amqpSession->ReturnNativeSession(); + } +} + +bool QpidAddress::PolicyApplies(String^ mode) { + if (mode->Equals("always")) + return true; + if (mode->Equals("sender")) + return !isInputChannel; + if (mode->Equals("receiver")) + return isInputChannel; + if (mode->Equals("never")) + return false; + + throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address)); +} + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h new file mode 100644 index 0000000000..d24317c2aa --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + +public ref class QpidAddress +{ +private: + QpidAddress(String^ address, bool isInput); + + // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded + String^ address; + + String^ nodeName; + // "qpid.subject" + String^ subject; + // 0-10 routing key (Output channels only) + String^ routingKey; + + String^ linkName; + String^ deleteName; + String^ bindKey; + + // node type: queue/topic + bool isQueue; + + // direction + bool isInputChannel; + + bool creating; + bool deleting; + bool browsing; + + bool PolicyApplies(String^ mode); + +internal: + static QpidAddress^ CreateAddress(String ^s, bool isInput); + void ResolveLink(AmqpSession^ amqpSession); + void CleanupLink(AmqpSession^ amqpSession); + + property String^ LinkName { + String^ get () { return linkName; } + } + + property String^ Subject { + String^ get () { return subject; } + } + + property String^ RoutingKey { + String^ get () { return routingKey; } + } + + property bool Browsing { + bool get () { return browsing; } + } + +}; + +}}} // namespace Apache::Qpid::Interop |
