summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid
diff options
context:
space:
mode:
authorClifford Allan Jansen <cliffjansen@apache.org>2010-10-24 23:29:37 +0000
committerClifford Allan Jansen <cliffjansen@apache.org>2010-10-24 23:29:37 +0000
commitf86e9093784d583f9af5068cbb5cdbc9a0f90e19 (patch)
treef6ce582ef2e06f9375429cf90936360ce33e81e5 /qpid/wcf/src/Apache/Qpid
parentb3fba89268a91558b7d5012bdc9a14f8385c89a5 (diff)
downloadqpid-python-f86e9093784d583f9af5068cbb5cdbc9a0f90e19.tar.gz
QPID-2646 patches
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1026915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid')
-rw-r--r--qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs23
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs113
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj1
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp29
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h6
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp20
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.h8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp12
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h13
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp304
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h89
12 files changed, 572 insertions, 54 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
index 0f649dcd36..4099571fe0 100644
--- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
+++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs
@@ -29,7 +29,7 @@ namespace Apache.Qpid.AmqpTypes
// AMQP 0-10 delivery properties
private bool durable;
private Nullable<TimeSpan> timeToLive;
- private string routingKey;
+ private string subject;
// AMQP 0-10 message properties
private string replyToExchange;
@@ -50,7 +50,7 @@ namespace Apache.Qpid.AmqpTypes
{
get
{
- return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue);
+ return ((this.subject != null) || this.durable || this.timeToLive.HasValue);
}
}
@@ -163,10 +163,19 @@ namespace Apache.Qpid.AmqpTypes
set { this.timeToLive = value; }
}
+ /// <summary>
+ /// Obsolete: switch to AMQP 1.0 "Subject" naming
+ /// </summary>
public string RoutingKey
{
- get { return this.routingKey; }
- set { this.routingKey = value; }
+ get { return this.subject; }
+ set { this.subject = value; }
+ }
+
+ public string Subject
+ {
+ get { return this.subject; }
+ set { this.subject = value; }
}
public string ReplyToExchange
@@ -200,7 +209,7 @@ namespace Apache.Qpid.AmqpTypes
public void Clear()
{
this.timeToLive = null;
- this.routingKey = null;
+ this.subject = null;
this.replyToRoutingKey = null;
this.replyToExchange = null;
this.durable = false;
@@ -251,9 +260,9 @@ namespace Apache.Qpid.AmqpTypes
this.replyToRoutingKey = other.replyToRoutingKey;
}
- if (other.routingKey != null)
+ if (other.subject != null)
{
- this.routingKey = other.routingKey;
+ this.subject = other.subject;
}
if (other.durable)
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
index a6f6ee6800..6f0ffd9815 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
using System.Text;
using System.Threading;
using System.Globalization;
+ using System.Web;
using System.Xml;
// the thin interop layer that provides access to the Qpid AMQP client libraries
@@ -52,11 +53,11 @@ namespace Apache.Qpid.Channel
private bool shared;
private int prefetchLimit;
private string encoderContentType;
+ // AMQP subject/routing key
+ private string subject;
+ // Qpid addressing value for "qpid.subject" property
+ private string qpidSubject;
- // input = 0-10 queue, output = 0-10 exchange
- private string queueName;
-
- private String routingKey;
private BufferManager bufferManager;
private AmqpProperties outputMessageProperties;
@@ -85,7 +86,7 @@ namespace Apache.Qpid.Channel
this.remoteAddress = remoteAddress;
// pull out host, port, queue, and connection arguments
- this.ParseAmqpUri(remoteAddress.Uri);
+ string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject);
this.encoder = msgEncoder;
string ct = String.Empty;
@@ -129,12 +130,14 @@ namespace Apache.Qpid.Channel
if (this.isInputChannel)
{
- this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress);
this.inputLink.PrefetchLimit = this.prefetchLimit;
}
else
{
- this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress);
+ this.subject = this.outputLink.DefaultSubject;
+ this.qpidSubject = this.outputLink.QpidSubject;
}
}
@@ -423,9 +426,14 @@ namespace Apache.Qpid.Channel
outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties);
}
- if (this.routingKey != null)
+ if (this.subject != null)
{
- outgoingProperties.RoutingKey = this.routingKey;
+ outgoingProperties.RoutingKey = this.subject;
+ }
+
+ if (this.qpidSubject != null)
+ {
+ outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject);
}
// Add the Properties set by the application on this particular message.
@@ -544,8 +552,7 @@ namespace Apache.Qpid.Channel
this.bufferManager.Clear();
}
- // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key"
- private void ParseAmqpUri(Uri uri)
+ private string UriToQpidAddress(Uri uri, out string subject)
{
if (uri.Scheme != AmqpConstants.Scheme)
{
@@ -553,43 +560,83 @@ namespace Apache.Qpid.Channel
"The scheme {0} specified in address is not supported.", uri.Scheme), "uri");
}
- this.queueName = uri.LocalPath;
+ subject = "";
+ string path = uri.LocalPath;
+ string query = uri.Query;
+
+ // legacy... convert old style myqueue?routingkey=key to myqueue/key
- if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel)
+ if (query.Length > 0)
{
- throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri");
- }
+ if (!query.StartsWith("?"))
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid query argument."), "uri");
+ }
- // search out session parameters in the query portion of the URI
+ string routingParseKey = "routingkey=";
+ string subjectParseKey = "subject=";
+ char[] charSeparators = new char[] { '?', ';' };
+ string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
+ foreach (string s in args)
+ {
+ if (s.StartsWith(routingParseKey))
+ {
+ subject = s.Substring(routingParseKey.Length);
+ }
+ else if (s.StartsWith(subjectParseKey))
+ {
+ subject = s.Substring(subjectParseKey.Length);
+ }
+ else
+ {
+ if (s.Length > 0)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid query argument {0}.", s), "uri");
+ }
+ }
+ }
- string routingParseKey = "routingkey=";
- char[] charSeparators = new char[] { '?', ';' };
- string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries);
- foreach (string s in args)
- {
- if (s.StartsWith(routingParseKey))
+ if (path.Contains("/"))
+ {
+ throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
+ "Invalid queue name {0}.", path), "uri");
+ }
+
+ if (path.Length == 0)
{
- this.routingKey = s.Substring(routingParseKey.Length);
+ // special case, user wants default exchange
+ return "//" + subject;
}
+
+ return path + "/" + subject;
}
- if (this.queueName == String.Empty)
+ // find subject in "myqueue/mysubject;{mode:browse}"
+ int pos = path.IndexOf('/');
+ if ((pos > -1) && (pos < path.Length + 1))
{
- if (this.isInputChannel)
+ subject = path.Substring(pos);
+ pos = subject.IndexOf(';');
+ if (pos == 0)
{
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "Empty queue target specifier not allowed."), "uri");
+ "Empty subject in address {0}.", path), "uri");
}
- else
+
+ if (pos > 0)
{
- if (this.routingKey == null)
- {
- throw new ArgumentException(string.Format(CultureInfo.CurrentCulture,
- "No target queue or routing key specified."), "uri");
- }
+ subject = subject.Substring(0, pos);
}
}
+
+ if (subject.Length > 0)
+ {
+ subject = HttpUtility.UrlDecode(subject);
+ }
+
+ return HttpUtility.UrlDecode(path);
}
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
index 6bb059daf6..1eb811b425 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
+++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
@@ -97,6 +97,7 @@ under the License.
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
<Reference Include="System.Transactions" />
+ <Reference Include="System.Web" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
index d2adb41205..ac7c777d1f 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -357,17 +357,20 @@ bool AmqpSession::MessageStop(std::string &name)
return true;
}
-void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing)
{
lock l(sessionLock);
- // delimit with session dtx commands depending on the transaction context
- UpdateTransactionState(%l);
+ if (!browsing) {
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
+ }
CheckOpen();
sessionp->markCompleted(transfers, false);
- sessionp->messageAccept(transfers, false);
+ if (!browsing)
+ sessionp->messageAccept(transfers, false);
}
@@ -609,4 +612,22 @@ void AmqpSession::ReleaseCompletion(IntPtr completion) {
delete completion.ToPointer();
}
+
+// Non-exclusive borrowing for a "brief" period. I.e. several synced
+// commands (address resolution)
+
+IntPtr AmqpSession::BorrowNativeSession() {
+ lock l(sessionLock);
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs();
+ return (IntPtr) sessionp;
+}
+
+void AmqpSession::ReturnNativeSession() {
+ lock l(sessionLock);
+ DecrementSyncs();
+}
+
}}} // namespace Apache::Qpid::Cli
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
index 88ffd18dcc..7a49496805 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -78,7 +78,7 @@ public:
OutputLink^ CreateOutputLink(System::String^ targetQueue);
InputLink^ CreateInputLink(System::String^ sourceQueue);
- // 0-10 specific support
+ // 0-10 specific support; deprecated in favor of Qpid messaging addresses
InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange);
void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey);
@@ -90,7 +90,7 @@ internal:
void internalWaitForCompletion(IntPtr future);
void removeWaiter(CompletionWaiter^ waiter);
bool MessageStop(std::string &name);
- void AcceptAndComplete(SequenceSet& transfers);
+ void AcceptAndComplete(SequenceSet& transfers, bool browsing);
IntPtr BeginPhase0Flush(XaTransaction^);
void EndPhase0Flush(XaTransaction^, IntPtr);
IntPtr DtxStart(IntPtr xidp, bool, bool);
@@ -98,6 +98,8 @@ internal:
IntPtr DtxCommit(IntPtr xidp, bool onePhase);
IntPtr DtxRollback(IntPtr xidp);
void ReleaseCompletion(IntPtr completion);
+ IntPtr BorrowNativeSession();
+ void ReturnNativeSession();
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index 3245cd3540..2b0119e338 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -88,9 +88,12 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
waiters = gcnew Collections::Generic::List<MessageWaiter^>();
linkLock = waiters; // private and available
subscriptionLock = gcnew Object();
+ qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
+ qpidAddress->ResolveLink(session);
+ browsing = qpidAddress->Browsing;
try {
- std::string qname = QpidMarshal::ToNative(sourceQueue);
+ std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
if (temporary) {
qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
@@ -104,6 +107,15 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
settings.flowControl = FlowControl::messageCredit(0);
settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+ if (browsing) {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
+ }
+ else {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
+ }
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -186,8 +198,10 @@ void InputLink::Cleanup()
{
ReleaseNative();
}
-
}
+
+ // Now that subscription is torn down, we can execute pending delete on remote node
+ qpidAddress->CleanupLink(amqpSession);
amqpSession->NotifyClosed();
}
@@ -699,7 +713,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
// subscriptionp->accept(frameSetID) is a slow sync operation in the native API
// so do it within the AsyncSession directly
- amqpSession->AcceptAndComplete(frameSetID);
+ amqpSession->AcceptAndComplete(frameSetID, browsing);
workingCredit--;
// check if more messages need to be requested from broker
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
index 2f96b91944..136d53d280 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -20,6 +20,7 @@
#pragma once
#include "MessageWaiter.h"
+#include "QpidAddress.h"
namespace Apache {
namespace Qpid {
@@ -58,6 +59,9 @@ private:
// working credit low water mark
int minWorkingCredit;
+ bool browsing;
+ QpidAddress^ qpidAddress;
+
void Cleanup();
void ReleaseNative();
bool haveMessage();
@@ -97,6 +101,10 @@ public:
void set (int value);
}
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
};
}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index 2056c97d57..fe288cbe76 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -409,6 +409,10 @@
>
</File>
<File
+ RelativePath=".\QpidAddress.cpp"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.cpp"
>
</File>
@@ -455,6 +459,10 @@
>
</File>
<File
+ RelativePath=".\QpidAddress.h"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.h"
>
</File>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
index 27725b8207..de7141dadb 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
@@ -48,13 +48,14 @@ using namespace std;
using namespace Apache::Qpid::AmqpTypes;
-OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) :
+OutputLink::OutputLink(AmqpSession^ session, String^ address) :
amqpSession(session),
- queue(defaultQueue),
disposed(false),
maxFrameSize(session->Connection->MaxFrameSize),
finalizing(false)
{
+ qpidAddress = QpidAddress::CreateAddress(address, false);
+ qpidAddress->ResolveLink(session);
}
void OutputLink::Cleanup()
@@ -67,6 +68,8 @@ void OutputLink::Cleanup()
disposed = true;
}
+ // process any pending queue delete
+ qpidAddress->CleanupLink(amqpSession);
amqpSession->NotifyClosed();
}
@@ -217,7 +220,8 @@ void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout)
ManagedToNative(amqpMessage);
MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr);
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream,
+ timeout, false, nullptr, nullptr);
if (waiter != nullptr) {
waiter->WaitForCompletion();
@@ -234,7 +238,7 @@ IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout,
ManagedToNative(amqpMessage);
MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state);
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state);
return waiter;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
index 1f049a7412..e30d1cc79f 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
@@ -19,6 +19,8 @@
#pragma once
+#include "QpidAddress.h"
+
namespace Apache {
namespace Qpid {
namespace Interop {
@@ -34,7 +36,7 @@ public ref class OutputLink
{
private:
AmqpSession^ amqpSession;
- String^ queue;
+ QpidAddress^ qpidAddress;
bool disposed;
bool finalizing;
void Cleanup();
@@ -58,6 +60,15 @@ public:
AmqpTypes::AmqpProperties^ get () { return defaultProperties; }
void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; }
}
+
+ property String^ DefaultSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; }
+ }
+
+ property String^ QpidSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; }
+ }
+
};
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
new file mode 100644
index 0000000000..bfae1ab313
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+/*
+ * This program parses strings of the form "node/subject;{options}" as
+ * used in the Qpid messaging API. It provides basic wiring
+ * capabilities to create/delete temporary queues (to topic
+ * subsciptions) and unbound "point and shoot" queues.
+ */
+
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+QpidAddress::QpidAddress(String^ s, bool isInput) {
+ address = s;
+ nodeName = s;
+ isInputChannel = isInput;
+ isQueue = true;
+
+ if (address->StartsWith("//")) {
+ // special case old style address to default exchange,
+ // no options, output only
+ if ((s->IndexOf(';') != -1) || isInputChannel)
+ throw gcnew ArgumentException("Invalid 0-10 address: " + address);
+ nodeName = nodeName->Substring(2);
+ return;
+ }
+
+ String^ options = nullptr;
+ int pos = s->IndexOf(';');
+ if (pos != -1) {
+ options = s->Substring(pos + 1);
+ nodeName = s->Substring(0, pos);
+
+ if (options->Length > 0) {
+ if (!options->StartsWith("{") || !options->EndsWith("}"))
+ throw gcnew ArgumentException("Invalid address: " + address);
+ options = options->Substring(1, options->Length - 2);
+ array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries);
+
+ if ((subOpts->Length % 2) != 0)
+ throw gcnew ArgumentException("Bad address (options): " + address);
+
+ for (int i=0; i < subOpts->Length; i += 2) {
+ String^ opt = subOpts[i];
+ String^ optArg = subOpts[i+1];
+ if (opt->Equals("create")) {
+ creating = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("delete")) {
+ deleting = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("mode")) {
+ if (optArg->Equals("browse")) {
+ browsing = isInputChannel;
+ }
+ else if (!optArg->Equals("consume")) {
+ throw gcnew ArgumentException("Invalid browsing option: " + optArg);
+ }
+ }
+ else if (opt->Equals("assert") || opt->Equals("node")) {
+ throw gcnew ArgumentException("Unsupported address option: " + opt);
+ }
+ else {
+ throw gcnew ArgumentException("Bad address option: " + opt);
+ }
+ }
+ }
+ else
+ options = nullptr;
+ }
+
+ pos = nodeName->IndexOf('/');
+ if (pos != -1) {
+ subject = nodeName->Substring(pos + 1);
+ if (String::IsNullOrEmpty(subject))
+ subject = nullptr;
+ nodeName = nodeName->Substring(0, pos);
+ }
+}
+
+
+QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) {
+ QpidAddress^ addr = gcnew QpidAddress(s, isInput);
+ return addr;
+}
+
+
+void QpidAddress::ResolveLink(AmqpSession^ amqpSession) {
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL)
+ throw gcnew ObjectDisposedException("session");
+
+ deleteName = nullptr;
+ isQueue = true;
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string n_name = QpidMarshal::ToNative(nodeName);
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name);
+
+ bool queueFound = !result.getQueueNotFound();
+ bool exchangeFound = !result.getExchangeNotFound();
+
+ if (isInputChannel) {
+
+ if (queueFound) {
+ linkName = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound) {
+ isQueue = false;
+ String^ tmpkey = nullptr;
+ String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString();
+ bool haveSubject = !String::IsNullOrEmpty(subject);
+ FieldTable bindArgs;
+
+ std::string exchangeType = session.exchangeQuery(n_name).getType();
+ if (exchangeType == "topic") {
+ tmpkey = haveSubject ? subject : "#";
+ }
+ else if (exchangeType == "fanout") {
+ tmpkey = tmpname;
+ }
+ else if (exchangeType == "headers") {
+ tmpkey = haveSubject ? subject : "match-all";
+ if (haveSubject)
+ bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject));
+ bindArgs.setString("x-match", "all");
+ }
+ else if (exchangeType == "xml") {
+ tmpkey = haveSubject ? subject : "";
+ if (haveSubject) {
+ String^ v = "declare variable $qpid.subject external; $qpid.subject = '" +
+ subject + "'";
+ bindArgs.setString("xquery", QpidMarshal::ToNative(v));
+ }
+ else
+ bindArgs.setString("xquery", "true()");
+ }
+ else {
+ tmpkey = haveSubject ? subject : "";
+ }
+
+ std::string qn = QpidMarshal::ToNative(tmpname);
+ session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true);
+ bool success = false;
+ try {
+ session.exchangeBind(arg::exchange=n_name, arg::queue=qn,
+ arg::bindingKey=QpidMarshal::ToNative(tmpkey),
+ arg::arguments=bindArgs);
+ bindKey = tmpkey; // remember for later cleanup
+ success = true;
+ }
+ finally {
+ if (!success)
+ session.queueDelete(arg::queue=qn);
+ }
+ linkName = tmpname;
+ deleteName = tmpname;
+ deleting = true;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+
+ linkName = nodeName;
+
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ else {
+ // Output channel
+
+ bool oldStyleUri = address->StartsWith("//");
+
+ if (queueFound) {
+ linkName = ""; // default exchange for point and shoot
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound && !oldStyleUri) {
+ isQueue = false;
+ linkName = nodeName;
+ routingKey = subject;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+ linkName = "";
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+void QpidAddress::CleanupLink(AmqpSession^ amqpSession) {
+ if (deleteName == nullptr)
+ return;
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL) {
+ // TODO: log it: can't undo tear down actions
+ return;
+ }
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string q = QpidMarshal::ToNative(deleteName);
+ if (isInputChannel && !isQueue) {
+ // undo the temp wiring to the topic
+ session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q,
+ arg::bindingKey=QpidMarshal::ToNative(bindKey));
+ }
+ session.queueDelete(q);
+ }
+ catch (Exception^ e) {
+ // TODO: log it
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+bool QpidAddress::PolicyApplies(String^ mode) {
+ if (mode->Equals("always"))
+ return true;
+ if (mode->Equals("sender"))
+ return !isInputChannel;
+ if (mode->Equals("receiver"))
+ return isInputChannel;
+ if (mode->Equals("never"))
+ return false;
+
+ throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address));
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
new file mode 100644
index 0000000000..d24317c2aa
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class QpidAddress
+{
+private:
+ QpidAddress(String^ address, bool isInput);
+
+ // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded
+ String^ address;
+
+ String^ nodeName;
+ // "qpid.subject"
+ String^ subject;
+ // 0-10 routing key (Output channels only)
+ String^ routingKey;
+
+ String^ linkName;
+ String^ deleteName;
+ String^ bindKey;
+
+ // node type: queue/topic
+ bool isQueue;
+
+ // direction
+ bool isInputChannel;
+
+ bool creating;
+ bool deleting;
+ bool browsing;
+
+ bool PolicyApplies(String^ mode);
+
+internal:
+ static QpidAddress^ CreateAddress(String ^s, bool isInput);
+ void ResolveLink(AmqpSession^ amqpSession);
+ void CleanupLink(AmqpSession^ amqpSession);
+
+ property String^ LinkName {
+ String^ get () { return linkName; }
+ }
+
+ property String^ Subject {
+ String^ get () { return subject; }
+ }
+
+ property String^ RoutingKey {
+ String^ get () { return routingKey; }
+ }
+
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
+};
+
+}}} // namespace Apache::Qpid::Interop