From 69d56a233cd6dbe02287c8a237a18fbc7505f140 Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Thu, 17 Dec 2009 16:03:54 +0000 Subject: Apply patches QPID-2128-2.patch, cppbld.patch and interop.tx.patch; resolves QPID-2128. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@891783 13f79535-47bb-0310-9956-ffa450edef68 --- .../Qpid/Channel/AmqpTransportBindingElement.cs | 7 +- wcf/src/Apache/Qpid/Channel/Channel.csproj | 1 + wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp | 664 +++++++++++++++++++++ wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp | 14 + wcf/src/Apache/Qpid/Interop/AmqpConnection.h | 26 +- wcf/src/Apache/Qpid/Interop/AmqpSession.cpp | 393 ++++++++++-- wcf/src/Apache/Qpid/Interop/AmqpSession.h | 38 +- wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp | 285 +++++++++ wcf/src/Apache/Qpid/Interop/DtxResourceManager.h | 76 +++ wcf/src/Apache/Qpid/Interop/InputLink.cpp | 86 ++- wcf/src/Apache/Qpid/Interop/InputLink.h | 2 + wcf/src/Apache/Qpid/Interop/Interop.vcproj | 23 +- wcf/src/Apache/Qpid/Interop/XaTransaction.cpp | 525 ++++++++++++++++ wcf/src/Apache/Qpid/Interop/XaTransaction.h | 96 +++ 14 files changed, 2153 insertions(+), 83 deletions(-) create mode 100644 wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp create mode 100644 wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp create mode 100644 wcf/src/Apache/Qpid/Interop/DtxResourceManager.h create mode 100644 wcf/src/Apache/Qpid/Interop/XaTransaction.cpp create mode 100644 wcf/src/Apache/Qpid/Interop/XaTransaction.h (limited to 'wcf/src') diff --git a/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs index 08c565af18..7993252309 100644 --- a/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ b/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -25,7 +25,7 @@ namespace Apache.Qpid.Channel using System.ServiceModel.Description; using Apache.Qpid.AmqpTypes; - public class AmqpTransportBindingElement : TransportBindingElement + public class AmqpTransportBindingElement : TransportBindingElement, ITransactedBindingElement { AmqpChannelProperties channelProperties; bool shared; @@ -112,6 +112,11 @@ namespace Apache.Qpid.Channel set { this.shared = value; } } + public bool TransactedReceiveEnabled + { + get { return true; } + } + public TransferMode TransferMode { get { return this.channelProperties.TransferMode; } diff --git a/wcf/src/Apache/Qpid/Channel/Channel.csproj b/wcf/src/Apache/Qpid/Channel/Channel.csproj index 7484bc38ac..ac90fb7d64 100644 --- a/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -90,6 +90,7 @@ under the License. 3.0 + diff --git a/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp b/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp new file mode 100644 index 0000000000..f9d8bd8521 --- /dev/null +++ b/wcf/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp @@ -0,0 +1,664 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +// +// This module provides the backend recovery driver for Windows resource managers based on +// the IDtcToXaHelperSinglePipe interface. The dll is loaded (LoadLibrary) directly into DTC +// itself and runs at a different protection level from the resource manager instance, which +// runs inside the application. +// +// The DTC dynamically loads this file, calls GetXaSwitch() to access the XA interface +// implementation and unloads the dll when done. +// +// This DTC plugin is only called for registration and recovery. Each time the application +// registers the Qpid resource manager with DTC, the plugin is loaded and a successful +// connection via xa_open is confirmed before completing registration and saving the DSN +// connection string in the DTC log for possible recovery. On recovery, the DSN is re-used to +// restablish a new connection with the broker and perform recovery. +// +// Because this plugin is not involved in coordinating any active transactions it only needs to +// partially implement the XA interface. +// +// For the same reason, the locking strategy is simple. A single global lock is used. +// Whenever networking activity is about to take place, the lock is relinquished and retaken +// soon thereafter. + + +#include +#include +#include +#include +#include + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Connection.h" + + +#include +#include +#include + +namespace Apache { +namespace Qpid { +namespace DtcPlugin { + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::framing::dtx; + +class ResourceManager +{ +private: + Connection qpidConnection; + Session qpidSession; + bool active; + std::string host; + int port; + int rmid; + std::vector inDoubtXids; + // current scan position, or -1 if no scan + int cursor; +public: + ResourceManager(int id, std::string h, int p) : rmid(id), host(h), port(p), active(false), cursor(-1) {} + ~ResourceManager() {} + INT open(); + INT close(); + INT commit(XID *xid); + INT rollback(XID *xid); + INT recover(XID *xids, long count, long flags); +}; + + +CRITICAL_SECTION rmLock; + +std::map rmMap; +HMODULE thisDll = NULL; +bool memLocked = false; + +#define QPIDHMCHARS 512 + +void pinDll() { + if (!memLocked) { + char thisDllName[QPIDHMCHARS]; + HMODULE ignore; + + DWORD nc = GetModuleFileName(thisDll, thisDllName, QPIDHMCHARS); + if ((nc > 0) && (nc < QPIDHMCHARS)) { + memLocked = GetModuleHandleEx(GET_MODULE_HANDLE_EX_FLAG_PIN, thisDllName, &ignore); + } + } +} + + +void XaToQpid(XID &winXid, Xid &qpidXid) { + // convert from XA defined structure XID to the Qpid framing structure + qpidXid.setFormat((uint32_t) winXid.formatID); + int bqualPos = 0; + if (winXid.gtrid_length > 0) { + qpidXid.setGlobalId(std::string(winXid.data, winXid.gtrid_length)); + bqualPos = winXid.gtrid_length; + } + if (winXid.bqual_length > 0) { + qpidXid.setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length)); + } +} + + +// this function assumes that the qpidXid has already been validated for the memory copy + +void QpidToXa(Xid &qpidXid, XID &winXid) { + // convert from the Qpid framing structure to the XA defined structure XID + winXid.formatID = qpidXid.getFormat(); + + const std::string& global_s = qpidXid.getGlobalId(); + size_t gl = global_s.size(); + winXid.gtrid_length = (long) gl; + if (gl > 0) + global_s.copy(winXid.data, gl); + + const std::string branch_s = qpidXid.getBranchId(); + size_t bl = branch_s.size(); + winXid.bqual_length = (long) bl; + if (bl > 0) + branch_s.copy(winXid.data + gl, bl); +} + + +/* parse string from AmqpConnection.h + + this info will eventually include authentication tokens + + dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host, + System::Diagnostics::Process::GetCurrentProcess()->Id, + AppDomain::CurrentDomain->Id); +*/ + +bool parseDsn (const char *dsn, std::string& host, int& port) { + if (dsn == NULL) + return false; + + size_t len = strnlen(dsn, 1025); + if (len > 1024) + return false; + + int firstDot = 0; + for (int i = 0; i < len; i++) + if (dsn[i] == '.') { + firstDot = i; + break; + } + if (!firstDot) + return false; + + // look for 2 dots side by side to indicate end of the host + int doubleDot = 0; + for (int i = firstDot + 1; i < (len - 1); i++) + if ((dsn[i] == '.') && (dsn[i+1] == '.')) { + doubleDot = i; + break; + } + if (!doubleDot) + return false; + + port = 0; + for (int i = 0; i < firstDot; i++) { + char c = dsn[i]; + if ((c < '0') || (c > '9')) + return false; + port = (10 * port) + (c - '0'); + } + + host.assign(dsn + firstDot + 1, (doubleDot - firstDot) - 1); + return true; +} + + +INT ResourceManager::open() { + INT rv = XAER_RMERR; // placeholder until we successfully connect to resource + active = true; + LeaveCriticalSection(&rmLock); + + try { + qpidConnection.open(host, port); + qpidSession = qpidConnection.newSession(); + rv = XA_OK; +/* +TODO: logging + } catch (const qpid::Exception& error) { + // log it + } catch (const std::exception& e2) { + // log it +*/ + } catch (...) { + // TODO: log it + } + + EnterCriticalSection(&rmLock); + active = false; + return rv; +} + + +INT ResourceManager::close() { + // should never be called when already sending other commands to broker + if (active) + return XAER_PROTO; + + INT rv = XAER_RMERR; // placeholder until we successfully close resource + active = true; + LeaveCriticalSection(&rmLock); + try { + if (qpidSession.isValid()) { + qpidSession.close(); + } + if (qpidConnection.isOpen()) { + qpidConnection.close(); + } + } catch (...) { + // TODO: log it + } + + EnterCriticalSection(&rmLock); + active = false; + + if (!qpidConnection.isOpen()) { + rv = XA_OK; + } + return rv; +} + + +INT ResourceManager::commit(XID *xid) { + if (active) + return XAER_PROTO; + + INT rv = XAER_RMFAIL; + active = true; + LeaveCriticalSection(&rmLock); + + try { + qpid::framing::Xid qpidXid; + XaToQpid(*xid, qpidXid); + + XaResult xaResult = qpidSession.dtxCommit(qpidXid, false, true); + if (xaResult.hasStatus()) { + uint16_t status = xaResult.getStatus(); + switch ((XaStatus) status) { + case XA_STATUS_XA_OK: + case XA_STATUS_XA_RDONLY: + case XA_STATUS_XA_HEURCOM: + rv = XA_OK; + break; + + default: + // commit failed and a retry won't fix + rv = XAER_RMERR; + break; + } + + } + } catch (...) { + // TODO: log it + } + + EnterCriticalSection(&rmLock); + active = false; + return rv; +} + + +INT ResourceManager::rollback(XID *xid) { + if (active) + return XAER_PROTO; + + INT rv = XAER_RMFAIL; + active = true; + LeaveCriticalSection(&rmLock); + + try { + qpid::framing::Xid qpidXid; + XaToQpid(*xid, qpidXid); + + XaResult xaResult = qpidSession.dtxRollback(qpidXid, true); + if (xaResult.hasStatus()) { + uint16_t status = xaResult.getStatus(); + switch ((XaStatus) status) { + case XA_STATUS_XA_OK: + case XA_STATUS_XA_HEURRB: + rv = XA_OK; + break; + + default: + // RM internal error + rv = XA_RBPROTO; + break; + } + } + } catch (...) { + // TODO: log it + } + + EnterCriticalSection(&rmLock); + active = false; + return rv; +} + + +INT ResourceManager::recover(XID *xids, long count, long flags) { + if (active) + return XAER_PROTO; + + if ((xids == NULL) && (count != 0)) + return XAER_INVAL; + + if (count < 0) + return XAER_INVAL; + + if (!(flags & TMSTARTRSCAN) && (cursor == -1)) + // no existing scan and no scan requested + return XAER_INVAL; + + INT status = XA_OK; + + if (flags & TMSTARTRSCAN) { + // start a fresh scan + cursor = -1; + inDoubtXids.clear(); + active = true; + LeaveCriticalSection(&rmLock); + + try { + // status if we can't talk to the broker + status = XAER_RMFAIL; + std::vector wireFormatXids; + + DtxRecoverResult dtxrr = qpidSession.dtxRecover(true); + + // status if we can't process the xids + status = XAER_RMERR; + dtxrr.getInDoubt().collect(wireFormatXids); + size_t nXids = wireFormatXids.size(); + + if (nXids > 0) { + StructHelper decoder; + Xid qpidXid; + for (int i = 0; i < nXids; i++) { + decoder.decode (qpidXid, wireFormatXids[i]); + inDoubtXids.push_back(qpidXid); + } + + // if we got here the decoder validated the Xids + status = XA_OK; + + // make sure none are too big, just in case + + for (int i = 0; i < nXids; i++) { + Xid& xid = inDoubtXids[i]; + size_t l1 = xid.hasGlobalId() ? xid.getGlobalId().size() : 0; + size_t l2 = xid.hasBranchId() ? xid.getBranchId().size() : 0; + if ((l1 > MAXGTRIDSIZE) || (l2 > MAXBQUALSIZE) || + ((l1 + l2) > XIDDATASIZE)) { + status = XAER_RMERR; + break; + } + } + } + else { + // nXids == 0, the previously cleared inDoubtXids is correctly populated + status = XA_OK; + } + + if (status == XA_OK) + cursor = 0; + } catch (...) { + // TODO: log it + } + + EnterCriticalSection(&rmLock); + active = false; + } + else { + // TMSTARTRSCAN not set, is there an existing scan to work from? + if (cursor == -1) + return XAER_INVAL; + } + + if (status != XA_OK) + return status; + + INT actualCount = count; + if (count > 0) { + int nAvailable = (int) inDoubtXids.size() - cursor; + if (nAvailable < count) + actualCount = nAvailable; + + for (int i = 0; i < actualCount; i++) { + Xid& qpidXid = inDoubtXids[i + cursor]; + QpidToXa(qpidXid, xids[i]); + } + } + + if (flags & TMENDRSCAN) { + cursor = -1; + inDoubtXids.clear(); + } + + return actualCount; +} + + +// Call with lock held + +ResourceManager* findRm(int rmid) { + if (rmMap.find(rmid) == rmMap.end()) { + return NULL; + } + return rmMap[rmid]; +} + + +INT __cdecl xa_open (char *xa_info, int rmid, long flags) { + if (flags & TMASYNC) + return XAER_ASYNC; + + INT rv = XAER_RMERR; + EnterCriticalSection(&rmLock); + + ResourceManager* rmp = findRm(rmid); + if (rmp != NULL) { + // error: already in use + rv = XAER_PROTO; + } + else { + std::string brokerHost; + int brokerPort; + if (parseDsn(xa_info, brokerHost, brokerPort)) { + + try { + rmp = new ResourceManager(rmid, brokerHost, brokerPort); + + rv = rmp->open(); + if (rv != XA_OK) { + delete (rmp); + } + else { + rmMap[rmid] = rmp; + } + } catch (...) {} + } + else { + rv = XAER_INVAL; + } + } + + LeaveCriticalSection(&rmLock); + return rv; +} + + +INT __cdecl xa_close (char *xa_info, int rmid, long flags) { + if (flags & TMASYNC) + return XAER_ASYNC; + + INT rv = XAER_RMERR; + + EnterCriticalSection(&rmLock); + ResourceManager* rmp = findRm(rmid); + + if (rmp == NULL) { + // can close multiple times + rv = XA_OK; + } + else { + rv = rmp->close(); + rmMap.erase(rmid); + try { + delete (rmp); + } catch (...) { + // TODO: log it + } + } + + LeaveCriticalSection(&rmLock); + return rv; +} + + +INT __cdecl xa_commit (XID *xid, int rmid, long flags) { + if (flags & TMASYNC) + return XAER_ASYNC; + + INT rv = XAER_RMFAIL; + + EnterCriticalSection(&rmLock); + ResourceManager* rmp = findRm(rmid); + + if (rmp == NULL) { + rv = XAER_INVAL; + } + else { + rv = rmp->commit(xid); + } + + LeaveCriticalSection(&rmLock); + return rv; +} + + +INT __cdecl xa_rollback (XID *xid, int rmid, long flags) { + if (flags & TMASYNC) + return XAER_ASYNC; + + INT rv = XAER_RMFAIL; + + EnterCriticalSection(&rmLock); + ResourceManager* rmp = findRm(rmid); + + if (rmp == NULL) { + rv = XAER_INVAL; + } + else { + rv = rmp->rollback(xid); + } + + LeaveCriticalSection(&rmLock); + return rv; +} + + +INT __cdecl xa_recover (XID *xids, long count, int rmid, long flags) { + INT rv = XAER_RMFAIL; + + EnterCriticalSection(&rmLock); + ResourceManager* rmp = findRm(rmid); + + if (rmp == NULL) { + rv = XAER_PROTO; + } + else { + rv = rmp->recover(xids, count, flags); + } + + LeaveCriticalSection(&rmLock); + return rv; +} + + +INT __cdecl xa_start (XID *xid, int rmid, long flags) { + // not used in recovery + return XAER_PROTO; +} + + +INT __cdecl xa_end (XID *xid, int rmid, long flags) { + // not used in recovery + return XAER_PROTO; +} + + +INT __cdecl xa_prepare (XID *xid, int rmid, long flags) { + // not used in recovery + return XAER_PROTO; +} + + +INT __cdecl xa_forget (XID *xid, int rmid, long flags) { + // not used in recovery + return XAER_PROTO; +} + + +INT __cdecl xa_complete (int *handle, int *retval, int rmid, long flags) { + // not used in recovery + return XAER_PROTO; +} + + + +xa_switch_t xaSwitch; + +HRESULT __cdecl GetQpidXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch) +{ + // needed for now due to implicit use of FreeLibrary in WSACleanup() in qpid/cpp/src/qpid/sys/windows/Socket.cpp + pinDll(); + + if (xaSwitch.xa_open_entry != xa_open) { + + xaSwitch.xa_open_entry = xa_open; + xaSwitch.xa_close_entry = xa_close; + xaSwitch.xa_start_entry = xa_start; + xaSwitch.xa_end_entry = xa_end; + xaSwitch.xa_prepare_entry = xa_prepare; + xaSwitch.xa_commit_entry = xa_commit; + xaSwitch.xa_rollback_entry = xa_rollback; + xaSwitch.xa_recover_entry = xa_recover; + xaSwitch.xa_forget_entry = xa_forget; + xaSwitch.xa_complete_entry = xa_complete; + + strcpy_s(xaSwitch.name, RMNAMESZ, "qpidxarm"); + xaSwitch.flags = TMNOMIGRATE; + xaSwitch.version = 0; + } + *ppXaSwitch = &xaSwitch; + return S_OK; +} + + + + +}}} // namespace Apache::Qpid::DtcPlugin + + +// GetXaSwitch + +extern "C" { + + __declspec(dllexport) HRESULT __cdecl GetXaSwitch (DWORD XaSwitchFlags, xa_switch_t ** ppXaSwitch) + { + return Apache::Qpid::DtcPlugin::GetQpidXaSwitch (XaSwitchFlags, ppXaSwitch); + } +} + + +// dllmain + +BOOL APIENTRY DllMain( HMODULE hModule, + DWORD ul_reason_for_call, + LPVOID lpReserved) +{ + + switch (ul_reason_for_call) + { + case DLL_PROCESS_ATTACH: + InitializeCriticalSection(&Apache::Qpid::DtcPlugin::rmLock); + Apache::Qpid::DtcPlugin::thisDll = hModule; + break; + + case DLL_PROCESS_DETACH: + DeleteCriticalSection(&Apache::Qpid::DtcPlugin::rmLock); + break; + + case DLL_THREAD_ATTACH: + case DLL_THREAD_DETACH: + break; + } + return TRUE; +} + diff --git a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp index 02d6c7ab18..c3afdf2280 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp +++ b/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp @@ -19,6 +19,7 @@ #include #include +#include #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" @@ -31,6 +32,8 @@ #include "AmqpSession.h" #include "QpidMarshal.h" #include "QpidException.h" +#include "DtxResourceManager.h" +#include "XaTransaction.h" namespace Apache { namespace Qpid { @@ -66,6 +69,9 @@ AmqpConnection::AmqpConnection(String^ server, int port) : success = true; const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); this->maxFrameSize = settings.maxFrameSize; + this->host = server; + this->port = port; + this->isOpen = true; } catch (const qpid::Exception& error) { String^ errmsg = gcnew String(error.what()); openException = gcnew QpidException(errmsg); @@ -80,6 +86,12 @@ AmqpConnection::AmqpConnection(String^ server, int port) : } } +AmqpConnection^ AmqpConnection::Clone() { + if (disposed) + throw gcnew ObjectDisposedException("AmqpConnection.Clone"); + return gcnew AmqpConnection (this->host, this->port); +} + void AmqpConnection::Cleanup() { { @@ -91,6 +103,7 @@ void AmqpConnection::Cleanup() try { // let the child sessions clean up + for each(AmqpSession^ s in sessions) { s->ConnectionClosed(); } @@ -98,6 +111,7 @@ void AmqpConnection::Cleanup() finally { if (connectionp != NULL) { + isOpen = false; connectionp->close(); delete connectionp; connectionp = NULL; diff --git a/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/wcf/src/Apache/Qpid/Interop/AmqpConnection.h index 2641391e82..6533185fa1 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpConnection.h +++ b/wcf/src/Apache/Qpid/Interop/AmqpConnection.h @@ -28,6 +28,7 @@ using namespace std; using namespace qpid::client; ref class AmqpSession; +ref class DtxResourceManager; public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs); @@ -35,21 +36,44 @@ public ref class AmqpConnection { private: Connection* connectionp; - void Cleanup(); + String^ host; + int port; bool disposed; Collections::Generic::List^ sessions; bool isOpen; int busyCount; int maxFrameSize; + DtxResourceManager^ dtxResourceManager; + void Cleanup(); + // unique string used for distributed transactions + String^ dataSourceName; internal: void NotifyBusy(); void NotifyIdle(); + AmqpConnection^ Clone(); property int MaxFrameSize { int get () { return maxFrameSize; } } + property DtxResourceManager^ CachedResourceManager { + DtxResourceManager^ get () { return dtxResourceManager; } + void set (DtxResourceManager^ value) { dtxResourceManager = value; } + } + + property String^ DataSourceName { + // Note: any change to this format has to be reflected in the DTC plugin's xa_open() + String^ get() { + if (dataSourceName == nullptr) { + dataSourceName = String::Format("{0}.{1}..AMQP.{2}.{3}", port, host, + System::Diagnostics::Process::GetCurrentProcess()->Id, + AppDomain::CurrentDomain->Id); + } + return dataSourceName; + } + } + public: AmqpConnection(System::String^ server, int port); ~AmqpConnection(); diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp index 425a592509..d2adb41205 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp +++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -19,6 +19,7 @@ #include #include +#include #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" @@ -28,6 +29,7 @@ #include "qpid/client/Message.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/Future.h" +#include "qpid/framing/Xid.h" #include "AmqpConnection.h" #include "AmqpSession.h" @@ -37,6 +39,8 @@ #include "OutputLink.h" #include "QpidMarshal.h" #include "QpidException.h" +#include "XaTransaction.h" +#include "DtxResourceManager.h" namespace Apache { namespace Qpid { @@ -44,6 +48,7 @@ namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; +using namespace System::Transactions; using namespace msclr; using namespace qpid::client; @@ -55,15 +60,20 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon sessionp(NULL), sessionImplp(NULL), subs_mgrp(NULL), - openCount(0) + helperRunning(false), + openCount(0), + syncCount(0), + closing(false), + dtxEnabled(false) { bool success = false; - try { sessionp = new qpid::client::AsyncSession; *sessionp = qpidConnectionp->newSession(); subs_mgrp = new SubscriptionManager (*sessionp); waiters = gcnew Collections::Generic::List(); + sessionLock = waiters; // waiters convenient and not publicly visible + openCloseLock = gcnew Object(); success = true; } finally { if (!success) { @@ -77,29 +87,36 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon void AmqpSession::Cleanup() { + bool connected = connection->IsOpen; + if (subs_mgrp != NULL) { - subs_mgrp->stop(); + if (connected) + subs_mgrp->stop(); delete subs_mgrp; subs_mgrp = NULL; } - if (localQueuep != NULL) { - delete localQueuep; - localQueuep = NULL; - } - if (sessionp != NULL) { - sessionp->close(); + if (connected) { + sessionp->close(); + } delete sessionp; sessionp = NULL; sessionImplp = NULL; } +} - if (connectionp != NULL) { - connectionp->close(); - delete connectionp; - connectionp = NULL; - } + +static qpid::framing::Xid& getXid(XaTransaction^ xaTx) +{ + return *((qpid::framing::Xid *)xaTx->XidHandle.ToPointer()); +} + + +void AmqpSession::CheckOpen() +{ + if (closing) + throw gcnew ObjectDisposedException("AmqpSession"); } @@ -107,7 +124,41 @@ void AmqpSession::Cleanup() void AmqpSession::ConnectionClosed() { - lock l(waiters); + lock l(sessionLock); + + if (closing) + return; + + closing = true; + + if (connection->IsOpen) { + // send closing handshakes... + + if (dtxEnabled) { + // session may close before all its transactions complete, at least force the phase 0 flush + if (pendingTransactions->Count > 0) { + array^ txArray = pendingTransactions->ToArray(); + l.release(); + for each (XaTransaction^ xaTx in txArray) { + //xaTx->SessionClosing(this); + xaTx->WaitForCompletion(); + } + l.acquire(); + } + } + + WaitLastSync (%l); + // Assert pendingTransactions->Count == 0 + + if (openXaTransaction != nullptr) { + // send final dtxend + sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); + openXaTransaction = nullptr; + openSystemTransaction = nullptr; + // this operation will complete by the time Cleanup() returns + } + } + Cleanup(); } @@ -119,10 +170,14 @@ InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue) InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange) { + lock ocl(openCloseLock); + lock l(sessionLock); + CheckOpen(); + InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange); { - lock l(waiters); if (openCount == 0) { + l.release(); connection->NotifyBusy(); } openCount++; @@ -132,9 +187,11 @@ InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclus OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) { - OutputLink^ link = gcnew OutputLink (this, targetQueue); + lock ocl(openCloseLock); + lock l(sessionLock); + CheckOpen(); - lock l(waiters); + OutputLink^ link = gcnew OutputLink (this, targetQueue); if (sessionImplp == NULL) { // not needed unless sending messages @@ -144,6 +201,7 @@ OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) } if (openCount == 0) { + l.release(); connection->NotifyBusy(); } openCount++; @@ -155,7 +213,7 @@ OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) // called whenever a child InputLink or OutputLink is closed or finalized void AmqpSession::NotifyClosed() { - lock l(waiters); + lock ocl(openCloseLock); openCount--; if (openCount == 0) { connection->NotifyIdle(); @@ -165,9 +223,14 @@ void AmqpSession::NotifyClosed() CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state) { - lock l(waiters); - if (sessionp == NULL) - throw gcnew ObjectDisposedException("Send"); + lock l(sessionLock); + + // delimit with session dtx commands depending on the transaction context + UpdateTransactionState(%l); + + CheckOpen(); + + bool syncPending = false; // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream @@ -191,26 +254,34 @@ CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodySt // waiter is responsible for releasing the Future native resource futurep = NULL; addWaiter(waiter); - } - - l.release(); - - if (waiter != nullptr) return waiter; + } // synchronous send with no timeout: no need to involve the asyncHelper thread + IncrementSyncs(); + syncPending = true; + l.release(); internalWaitForCompletion((IntPtr) futurep); } finally { + if (syncPending) { + if (!l.is_locked()) + l.acquire(); + DecrementSyncs(); + } if (futurep != NULL) delete (futurep); } return nullptr; } + void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey) { + lock l(sessionLock); + CheckOpen(); + sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue), arg::exchange=QpidMarshal::ToNative(exchange), arg::bindingKey=QpidMarshal::ToNative(filterKey)); @@ -220,14 +291,8 @@ void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System:: void AmqpSession::internalWaitForCompletion(IntPtr fp) { - lock l(waiters); - if (sessionp == NULL) - throw gcnew ObjectDisposedException("AmqpSession"); + Debug::Assert(syncCount > 0, "sync counter mismatch"); - // increment the smart pointer count to sessionImplp to guard agains async close - Session sessionCopy(*sessionp); - - l.release(); // Qpid native lib call to wait for the command completion ((Future *)fp.ToPointer())->wait(*sessionImplp); } @@ -235,6 +300,7 @@ void AmqpSession::internalWaitForCompletion(IntPtr fp) // call with lock held void AmqpSession::addWaiter(CompletionWaiter^ waiter) { + IncrementSyncs(); waiters->Add(waiter); if (!helperRunning) { helperRunning = true; @@ -247,13 +313,14 @@ void AmqpSession::removeWaiter(CompletionWaiter^ waiter) { // a waiter can be removed from anywhere in the list if timed out - lock l(waiters); + lock l(sessionLock); int idx = waiters->IndexOf(waiter); if (idx == -1) { // TODO: assert or log } else { waiters->RemoveAt(idx); + DecrementSyncs(); } } @@ -262,7 +329,7 @@ void AmqpSession::removeWaiter(CompletionWaiter^ waiter) void AmqpSession::asyncHelper(Object ^unused) { - lock l(waiters); + lock l(sessionLock); while (true) { if (waiters->Count == 0) { @@ -279,27 +346,267 @@ void AmqpSession::asyncHelper(Object ^unused) } } -bool AmqpSession::MessageStop(Completion &comp, std::string &name) +bool AmqpSession::MessageStop(std::string &name) { - lock l(waiters); + lock l(sessionLock); - if (sessionp == NULL) + if (closing) return false; - comp = sessionp->messageStop(name, true); + sessionp->messageStop(name, true); return true; } void AmqpSession::AcceptAndComplete(SequenceSet& transfers) { - lock l(waiters); + lock l(sessionLock); + + // delimit with session dtx commands depending on the transaction context + UpdateTransactionState(%l); - if (sessionp == NULL) - throw gcnew ObjectDisposedException("Accept"); + CheckOpen(); sessionp->markCompleted(transfers, false); sessionp->messageAccept(transfers, false); } +// call with session lock held + +void AmqpSession::UpdateTransactionState(lock^ slock) +{ + Transaction^ currentTx = Transaction::Current; + if ((currentTx == nullptr) && !dtxEnabled) { + // no transaction scope and no previous dtx work to monitor + return; + } + + if (currentTx == openSystemTransaction) { + // no change + return; + } + + if (!dtxEnabled) { + // AMQP requires that this be the first dtx-related command on the session + sessionp->dtxSelect(false); + dtxEnabled = true; + pendingTransactions = gcnew Collections::Generic::List(); + } + + bool notify = false; // unless the System.Transaction is no longer active + XaTransaction^ oldXaTx = openXaTransaction; + if (openSystemTransaction != nullptr) { + // The application may start a new transaction before the phase0 on rollback + try { + if (openSystemTransaction->TransactionInformation->Status != TransactionStatus::Active) { + notify = true; + } + } catch (System::ObjectDisposedException^) { + notify = true; + } + } + + slock->release(); + // only use stack variables until lock re-acquired + + if (notify) { + // will do call back to all enlisted sessions. call with session lock released. + // If NotifyPhase0() wins the race to start phase 0, openXaTransaction will be null + oldXaTx->NotifyPhase0(); + } + + XaTransaction^ newXaTx = nullptr; + if (currentTx != nullptr) { + // This must be called with locks released. The DTC and System.Transactions methods that + // will be called hold locks that interfere with the ITransactionResourceAsync callbacks. + newXaTx = DtxResourceManager::GetXaTransaction(this, currentTx); + } + + slock->acquire(); + + if (closing) + return; + + if (openSystemTransaction != nullptr) { + // some other transaction has the dtx window open + // close the XID window, suspend = true... in case it is used again + sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); + openSystemTransaction = nullptr; + openXaTransaction = nullptr; + } + + + // Call enlist with session lock held. The XaTransaction will call DtxStart before returning. + if (newXaTx != nullptr) { + if (!pendingTransactions->Contains(newXaTx)) { + pendingTransactions->Add(newXaTx); + } + + newXaTx->Enlist(this); + } + + openXaTransaction = newXaTx; + openSystemTransaction = currentTx; +} + + +typedef TypedResult XaResultCompletion; + + +// send the required closing dtx.End before Phase 1 + +IntPtr AmqpSession::BeginPhase0Flush(XaTransaction ^xaTx) { + + lock l(sessionLock); + IntPtr completionp = IntPtr::Zero; + try { + if (sessionp != NULL) { + + // proceed even if "closing == true", the phase 0 is part of the transition from closing to closed + + if (xaTx != openXaTransaction) { + // a different transaction (or none) is in scope, so xaTx was previously suspended. + // must re-open it to close it properly + if (openXaTransaction != nullptr) { + // suspend the session's current pending transaction + // it wil be reopened in a future enlistment or phase 0 flush. + sessionp->dtxEnd(getXid(openXaTransaction), false, true, false); + } + // resuming + sessionp->dtxStart(getXid(xaTx), false, true, false); + } + + // the closing (i.e. non-suspended) dtxEnd happens here (exactly once for a given transaction) + // set the sync bit since phase0 is a precondition to prepare or abort + completionp = (IntPtr) new XaResultCompletion(sessionp->dtxEnd(getXid(xaTx), false, false, true)); + IncrementSyncs(); + } + } + catch (System::Exception^ ) { + // all the caller wants to know is if completionp is non-null + } + + openXaTransaction = nullptr; + openSystemTransaction = nullptr; + return completionp; +} + + +void AmqpSession::EndPhase0Flush(XaTransaction ^xaTx, IntPtr intptr) { + XaResultCompletion *completionp = (XaResultCompletion *) intptr.ToPointer(); + lock l(sessionLock); + + if (completionp != NULL) { + try { + l.release(); + completionp->wait(); + pendingTransactions->Remove(xaTx); + } + catch (System::Exception^) { + // connection closed or network drop + } + finally { + l.acquire(); + DecrementSyncs(); + delete completionp; + } + } +} + + +IntPtr AmqpSession::DtxStart(IntPtr ip, bool join, bool resume) { + // called with session lock held (as a callback from the Enlist()) + // The XaTransaction knows if this should be the originating dtxStart, or a join/resume + IntPtr rv = IntPtr::Zero; + qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); + if (join || resume) { + sessionp->dtxStart(*xidp, join, resume, false); + } + else { + // The XaTransaction needs to track when the first dtxStart completes to safely request a join + IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs + rv = (IntPtr) new XaResultCompletion(sessionp->dtxStart(*xidp, join, resume, false)); + } + + return rv; +} + + +IntPtr AmqpSession::DtxPrepare(IntPtr ip) { + qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); + lock l(sessionLock); + + if (closing) + return IntPtr::Zero; + + IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs + return (IntPtr) new XaResultCompletion(sessionp->dtxPrepare(*xidp, true)); +} + + +IntPtr AmqpSession::DtxCommit(IntPtr ip, bool onePhase) { + qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); + lock l(sessionLock); + + if (closing) + return IntPtr::Zero; + + IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs + return (IntPtr) new XaResultCompletion(sessionp->dtxCommit(*xidp, onePhase, true)); +} + + +IntPtr AmqpSession::DtxRollback(IntPtr ip) { + qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer(); + lock l(sessionLock); + if (closing) + return IntPtr::Zero; + + IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs + + return (IntPtr) new XaResultCompletion(sessionp->dtxRollback(*xidp, true)); +} + + +//call with lock held +void AmqpSession::IncrementSyncs() { + syncCount++; +} + + +//call with lock held +void AmqpSession::DecrementSyncs() { + syncCount--; + Debug::Assert(syncCount >= 0, "sync counter underrun"); + if (syncCount == 0) { + if (closeWaitHandle != nullptr) { + // now OK to move from closing to closed + closeWaitHandle->Set(); + } + } +} + + +// call with lock held +void AmqpSession::WaitLastSync(lock ^l) { + if (syncCount == 0) + return; + if (AppDomain::CurrentDomain->IsFinalizingForUnload()) { + // a wait would be a hang. No more syncs coming + return; + } + if (closeWaitHandle == nullptr) + closeWaitHandle = gcnew ManualResetEvent(false); + l->release(); + closeWaitHandle->WaitOne(); + l->acquire(); +} + + +void AmqpSession::ReleaseCompletion(IntPtr completion) { + lock l(sessionLock); + DecrementSyncs(); + delete completion.ToPointer(); +} + }}} // namespace Apache::Qpid::Cli diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/wcf/src/Apache/Qpid/Interop/AmqpSession.h index 8306cdf720..88ffd18dcc 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpSession.h +++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -29,29 +29,50 @@ namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; +using namespace System::Transactions; +using namespace System::Diagnostics; + using namespace qpid::client; using namespace std; ref class InputLink; ref class OutputLink; +ref class XaTransaction; public ref class AmqpSession { private: + Object^ sessionLock; + Object^ openCloseLock; AmqpConnection^ connection; - Connection* connectionp; AsyncSession* sessionp; SessionImpl* sessionImplp; SubscriptionManager* subs_mgrp; - LocalQueue* localQueuep; Collections::Generic::List^ waiters; bool helperRunning; + + // number of active InputLinks and OutputLinks int openCount; + // the number of async commands sent to the broker that need completion confirmation + int syncCount; + + bool closing; + ManualResetEvent^ closeWaitHandle; + bool dtxEnabled; + Transaction^ openSystemTransaction; + XaTransaction^ openXaTransaction; + Collections::Generic::List^ pendingTransactions; + void Cleanup(); + void CheckOpen(); void asyncHelper(Object ^); void addWaiter(CompletionWaiter^ waiter); + void UpdateTransactionState(msclr::lock^ sessionLock); + void IncrementSyncs(); + void DecrementSyncs(); + void WaitLastSync(msclr::lock^ l); public: OutputLink^ CreateOutputLink(System::String^ targetQueue); @@ -66,16 +87,21 @@ internal: void NotifyClosed(); CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state); void ConnectionClosed(); - void internalWaitForCompletion(IntPtr Future); + void internalWaitForCompletion(IntPtr future); void removeWaiter(CompletionWaiter^ waiter); - bool MessageStop(Completion &comp, std::string &name); + bool MessageStop(std::string &name); void AcceptAndComplete(SequenceSet& transfers); + IntPtr BeginPhase0Flush(XaTransaction^); + void EndPhase0Flush(XaTransaction^, IntPtr); + IntPtr DtxStart(IntPtr xidp, bool, bool); + IntPtr DtxPrepare(IntPtr xidp); + IntPtr DtxCommit(IntPtr xidp, bool onePhase); + IntPtr DtxRollback(IntPtr xidp); + void ReleaseCompletion(IntPtr completion); property AmqpConnection^ Connection { AmqpConnection^ get () { return connection; } } - - }; }}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp new file mode 100644 index 0000000000..6ea31f8401 --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp @@ -0,0 +1,285 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/framing/FrameSet.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "DtxResourceManager.h" +#include "XaTransaction.h" +#include "QpidException.h" +#include "QpidMarshal.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Transactions; +using namespace msclr; + + +/* + * There is one DtxResourceManager per broker and per application process. + * + * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist + * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The + * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The + * XaTransaction object works through the lifecycle of the Transaction, including prompting the + * enlisted sessions to send their delimiting dtxEnd commands. + * + * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named + * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be + * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe. + * + * To enable transaction support: + * DTC requires a registry key to find the plugin + * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll] + * DTC needs to be configured for XA + * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions + * + */ + +// TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections. +// But note that a new RM registration with the DTC is very expensive. + + +DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) { + dtcComp = NULL; + xaHelperp = NULL; + rmCookie = 0; + doubtCount = 0; + tmDown = false; + AmqpConnection^ clonedCon = appConnection->Clone(); + dtxControlSession = clonedCon->CreateSession(); + dataSourceName = clonedCon->DataSourceName; + transactionMap = gcnew Collections::Generic::Dictionary(); + + HRESULT hr; + + try { + // instead of pinning this instance, just use tmp stack variables for small stuff + IUnknown* tmp = NULL; + // request the default DTC + hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp); + if (hr != S_OK) + throw gcnew QpidException("connection failure to DTC service"); + dtcComp = tmp; + + IDtcToXaHelperSinglePipe *tmp2 = NULL; + hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2); + if (hr != S_OK) + throw gcnew QpidException("DTC XA unavailable"); + xaHelperp = tmp2; + + std::string native_dsn = QpidMarshal::ToNative(dataSourceName); + DWORD tmp3; + + // This call doesn't return until the DTC has opened and closed a connection to the broker + // and written a recovery entry in its log. + hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast(native_dsn.c_str()), "qpidxarm.dll", &tmp3); + if (hr != S_OK) { + switch (hr) { + case E_FAIL: + throw gcnew QpidException("Resource Manager DLL configuration error"); + case E_INVALIDARG: + throw gcnew QpidException("Resource Manager internal error"); + case E_OUTOFMEMORY: + throw gcnew QpidException("Resource Manager out of memory"); + case E_UNEXPECTED: + throw gcnew QpidException("Resource Manager internal failure"); + case XACT_E_TMNOTAVAILABLE: + case XACT_E_CONNECTION_DOWN: + throw gcnew QpidException("MSDTC unavailable"); + + default: + throw gcnew QpidException("Resource Manager Registration failed"); + } + } + + rmCookie = tmp3; + } + finally { + if (rmCookie == 0) { + // undo partial construction + Cleanup(); + } + } +} + + +DtxResourceManager::!DtxResourceManager() { + Cleanup(); +} + + +DtxResourceManager::~DtxResourceManager() { + GC::SuppressFinalize(this); + Cleanup(); +} + + +// Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction +// called once for each outstanding tx + +void DtxResourceManager::TmDown() { + // this block is the only place where both locks are held + lock l1(transactionMap); + lock l2(resourceManagerMap); + if (tmDown) + return; + + tmDown = true; + resourceManagerMap->Remove(this->dataSourceName); + // defer cleanup until last TmDown notification received +} + + + +void DtxResourceManager::Cleanup() { + for each (Collections::Generic::KeyValuePair kvp in transactionMap) { + XaTransaction^ xaTr = kvp.Value; + xaTr->ChildFinalize(); + } + + try { + if (rmCookie != 0) { + // implies no recovery needed + bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0); + ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession); + rmCookie = 0; + } + + + if (xaHelperp != NULL) { + ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release(); + xaHelperp = NULL; + } + + if (dtcComp != NULL) { + ((IUnknown *) dtcComp)->Release(); + dtcComp = NULL; + } + + if (dtxControlSession != nullptr) { + dtxControlSession->Connection->Close(); + } + + } + catch (Exception^) {} +} + + +XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { + // find or create the RM instance associated with the session's broker + AmqpConnection^ connection = appSession->Connection; + DtxResourceManager^ instance = connection->CachedResourceManager; + + // try cached rm first + if (instance != nullptr) { + XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction); + if (xaTx != nullptr) + return xaTx; + else { + // cached version no longer available, force new rm creation + connection->CachedResourceManager = nullptr; + } + } + + lock l(resourceManagerMap); + String^ dsn = connection->DataSourceName; + if (!resourceManagerMap->TryGetValue(dsn, instance)) { + instance = gcnew DtxResourceManager(connection->Clone()); + resourceManagerMap->Add(dsn, instance); + connection->CachedResourceManager = instance; + } + l.release(); + + return instance->InternalGetXaTransaction(appSession, transaction); +} + + +XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { + // find or create the tx proxy instance associated with the DTC transaction + lock l(transactionMap); + if (tmDown) + return nullptr; + + XaTransaction^ xaTransaction = nullptr; + if (!transactionMap->TryGetValue(transaction, xaTransaction)) { + xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this); + transactionMap->Add(transaction, xaTransaction); + } + + return xaTransaction; +} + +void DtxResourceManager::Complete(Transaction ^tx) { + lock l(transactionMap); + transactionMap->Remove(tx); + + if (tmDown && (transactionMap->Count == 0)) { + // no more activity on this instance + GC::SuppressFinalize(this); + Cleanup(); + } +} + + +void DtxResourceManager::IncrementDoubt() { + Interlocked::Increment(doubtCount); +} + + +void DtxResourceManager::DecrementDoubt() { + Interlocked::Decrement(doubtCount); +} + + +#ifdef QPID_RECOVERY_TEST_HOOK +void DtxResourceManager::ForceRecovery(Transaction ^tx) { + lock l(resourceManagerMap); + for each (Collections::Generic::KeyValuePair kvp in resourceManagerMap) { + + Collections::Generic::Dictionary^ txmap = kvp.Value->transactionMap; + XaTransaction^ xaTransaction = nullptr; + lock l2(txmap); + if (txmap->TryGetValue(tx, xaTransaction)) { + xaTransaction->ForceRecovery(); + } + } +} +#endif + +}}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h new file mode 100644 index 0000000000..7df491eec2 --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h @@ -0,0 +1,76 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Transactions; + +ref class XaTransaction; + +public ref class DtxResourceManager +{ +private: + // Receive() or WaitForMessage() + AmqpSession^ dtxControlSession; + String^ dataSourceName; + bool consumed; + DWORD rmCookie; + void* xaHelperp; + void* dtcComp; + int doubtCount; + DtxResourceManager(AmqpConnection^); + XaTransaction^ InternalGetXaTransaction (AmqpSession^ session, Transaction^ transaction); + bool tmDown; + + // The active transactions + Collections::Generic::Dictionary^ transactionMap; + + // one resource manager per AMQP broker per process + static Collections::Generic::Dictionary^ resourceManagerMap = + gcnew Collections::Generic::Dictionary(); + + void Cleanup(); + ~DtxResourceManager(); + !DtxResourceManager(); + +internal: + static XaTransaction^ GetXaTransaction (AmqpSession^ session, Transaction^ transaction); + void Complete(Transaction ^tx); + void TmDown(); + + property AmqpSession^ DtxControlSession { + AmqpSession^ get () { return dtxControlSession; } + } + + void IncrementDoubt(); + void DecrementDoubt(); + +#ifdef QPID_RECOVERY_TEST_HOOK +public: + static void ForceRecovery(Transaction ^tx); +#endif +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/wcf/src/Apache/Qpid/Interop/InputLink.cpp index e12151d943..3245cd3540 100644 --- a/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -86,6 +86,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, System::Exception^ linkException = nullptr; waiters = gcnew Collections::Generic::List(); + linkLock = waiters; // private and available + subscriptionLock = gcnew Object(); try { std::string qname = QpidMarshal::ToNative(sourceQueue); @@ -120,10 +122,13 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, } } +// called with lock held void InputLink::ReleaseNative() { // involves talking to the Broker unless the connection is broken - if (subscriptionp != NULL) { + + if ((subscriptionp != NULL) && !finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread try { subscriptionp->cancel(); } @@ -134,20 +139,31 @@ void InputLink::ReleaseNative() } // free native mem (or smart pointers) that we own - if (subscriptionp != NULL) + if (subscriptionp != NULL) { delete subscriptionp; - if (queuePtrp != NULL) + subscriptionp = NULL; + } + if (queuePtrp != NULL) { delete queuePtrp; - if (localQueuep != NULL) - delete localQueuep; - if (dequeuedFrameSetpp != NULL) + queuePtrp = NULL; + } + if (localQueuep != NULL) { + if (!finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread + delete localQueuep; + localQueuep = NULL; + } + } + if (dequeuedFrameSetpp != NULL) { delete dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + } } void InputLink::Cleanup() { { - lock l(waiters); + lock l(linkLock); if (disposed) return; @@ -162,6 +178,9 @@ void InputLink::Cleanup() if (queuePtrp != NULL) (*queuePtrp)->close(); + // wait for any sync operations on the subscription to complete before ReleaseNative + lock l2(subscriptionLock); + try {} finally { @@ -179,6 +198,7 @@ InputLink::~InputLink() InputLink::!InputLink() { + finalizing = true; Cleanup(); } @@ -204,7 +224,7 @@ bool InputLink::haveMessage() IntPtr InputLink::nextLocalMessage() { - lock l(waiters); + lock l(linkLock); if (disposed) return (IntPtr) NULL; @@ -250,7 +270,7 @@ IntPtr InputLink::nextLocalMessage() void InputLink::unblockWaiter() { // to be followed by resetQueue() below - lock l(waiters); + lock l(linkLock); if (disposed) return; (*queuePtrp)->close(); @@ -264,7 +284,7 @@ void InputLink::unblockWaiter() void InputLink::resetQueue() { - lock l(waiters); + lock l(linkLock); if (disposed) return; if ((*queuePtrp)->isClosed()) { @@ -282,7 +302,7 @@ bool InputLink::internalWaitForMessage() bool received = false; QpidFrameSetPtr* frameSetpp = NULL; try { - lock l(waiters); + lock l(linkLock); if (disposed) return false; if (haveMessage()) @@ -348,7 +368,7 @@ void InputLink::addWaiter(MessageWaiter^ waiter) void InputLink::removeWaiter(MessageWaiter^ waiter) { // a waiter can be removed from anywhere in the list if timed out - lock l(waiters); + lock l(linkLock); int idx = waiters->IndexOf(waiter); if (idx == -1) { // TODO: assert or log @@ -388,7 +408,7 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) { void InputLink::asyncHelper() { - lock l(waiters); + lock l(linkLock); while (true) { if (disposed && (waiters->Count == 0)) { @@ -419,14 +439,14 @@ void InputLink::asyncHelper() void InputLink::sync() { - // for the timeout thread - lock l(waiters); + // used by the MessageWaiter timeout thread to not run before fully initialized + lock l(linkLock); } void InputLink::PrefetchLimit::set(int value) { - lock l(waiters); + lock l(linkLock); prefetchLimit = value; int delta = 0; @@ -475,31 +495,32 @@ void InputLink::AdjustCredit() void InputLink::SyncCredit(Object ^unused) { - lock l(waiters); + lock l(linkLock); try { if (disposed) return; - Completion comp; - if (!amqpSession->MessageStop(comp, subscriptionp->getName())) { + if (!amqpSession->MessageStop(subscriptionp->getName())) { // connection closed return; } - // get a private scoped copy to use outside the lock - Subscription s(*subscriptionp); - l.release(); // use setFlowControl to re-enable credit flow on the broker. - // previously used comp.wait() here, but setFlowControl is a sync operation - s.setFlowControl(s.getSettings().flowControl); + // setFlowControl is a sync operation + { + lock l2(subscriptionLock); + if (subscriptionp != NULL) { + subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); + } + } l.acquire(); if (disposed) return; - // let existing waiters use up any + // let existing waiters use up any messages that arrived. // local queue size can only decrease until more credit is issued while (true) { if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { @@ -700,7 +721,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) { - lock l(waiters); + lock l(linkLock); if (waiters->Count == 0) { // see if there is a message already available without blocking @@ -740,7 +761,7 @@ IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callba //TODO: if haveMessage() complete synchronously - lock l(waiters); + lock l(linkLock); MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); addWaiter(waiter); return waiter; @@ -779,7 +800,10 @@ bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMess bool InputLink::WaitForMessage(TimeSpan timeout) { - lock l(waiters); + lock l(linkLock); + + if (disposed) + return false; if (waiters->Count == 0) { // see if there is a message already available without blocking @@ -799,12 +823,12 @@ bool InputLink::WaitForMessage(TimeSpan timeout) return false; } - return true; + return haveMessage(); } IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) { - lock l(waiters); + lock l(linkLock); // Same as for BeginTryReceive, except consuming = false MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); @@ -822,7 +846,7 @@ bool InputLink::EndWaitForMessage(IAsyncResult^ result) return false; } - return true; + return haveMessage(); } diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.h b/wcf/src/Apache/Qpid/Interop/InputLink.h index f59a03a8c3..2f96b91944 100644 --- a/wcf/src/Apache/Qpid/Interop/InputLink.h +++ b/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -45,6 +45,8 @@ private: Collections::Generic::List^ waiters; bool disposed; bool finalizing; + Object^ linkLock; + Object^ subscriptionLock; QpidFrameSetPtr* dequeuedFrameSetpp; ManualResetEvent^ asyncHelperWaitHandle; // number of messages to buffer locally for future consumption diff --git a/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/wcf/src/Apache/Qpid/Interop/Interop.vcproj index b662be9d54..c2d6b30fff 100644 --- a/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ b/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -83,7 +83,7 @@ /> + + + + + + + + + diff --git a/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp new file mode 100644 index 0000000000..23743316ff --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp @@ -0,0 +1,525 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/Xid.h" + +#include "QpidException.h" +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "DtxResourceManager.h" +#include "XaTransaction.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Transactions; +using namespace msclr; + +using namespace qpid::framing::dtx; + +// ------------------------------------------------------------------------ +// Start of a pure native code section +#pragma unmanaged +// ------------------------------------------------------------------------ + +// This is the native COM object the DTC expects to talk to for coordination. +// There is exactly one native instance of this for each managed XaTransaction object. + + +class DtcCallbackHandler : public ITransactionResourceAsync +{ +private: + long useCount; + DtcCallbackFp managedCallback; +public: + ITransactionEnlistmentAsync *txHandle; + DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {} + ~DtcCallbackHandler() {} + virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase); + virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused); + virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3); + + virtual HRESULT __stdcall TMDown(); + virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject); + virtual ULONG __stdcall DtcCallbackHandler::AddRef(); + virtual ULONG __stdcall DtcCallbackHandler::Release(); + void __stdcall AbortRequestDone(); +}; + + +HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase) +{ + if (singlePhase) { + return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL; + } + + return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL; +} + + +HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused) +{ + return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL; +} + +HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3) +{ + return managedCallback(DTC_ABORT) ? S_OK : E_FAIL; +} + + +HRESULT DtcCallbackHandler::TMDown() +{ + return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL; +} + + +HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject) +{ + *ppvObject = NULL; + + if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink)) + *ppvObject = this; + else + return ResultFromScode(E_NOINTERFACE); + + this->AddRef(); + return S_OK; +} + + +ULONG DtcCallbackHandler::AddRef() +{ + return InterlockedIncrement(&useCount); +} + + +ULONG DtcCallbackHandler::Release() +{ + long uc = InterlockedDecrement(&useCount); + + if (uc) + return uc; + + delete this; + return 0; +} + + +// ------------------------------------------------------------------------ +// End of pure native code section +#pragma managed +// ------------------------------------------------------------------------ + +#ifdef QPID_RECOVERY_TEST_HOOK +void XaTransaction::ForceRecovery() { + debugFailMode = true; +} +#endif + +// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ + + +XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) { + bool success = false; + xidp = NULL; + commandCompletionp = NULL; + firstDtxStartCompletionp = NULL; + nativeHandler = NULL; + resourceManager = rm; + controlSession = rm->DtxControlSession; + active = true; + preparing = false; + systemTransaction = t; + IntPtr comTxp = IntPtr::Zero; + completionHandle = gcnew ManualResetEvent(false); + + try { + enlistedSessions = gcnew Collections::Generic::List(); + + // take a System.Transactions.Transaction and obtain + // the corresponding DTC COM object. + IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t); + comTxp = Marshal::GetIUnknownForObject(dtcTransaction); + XID winXid; + HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid); + if (hr != S_OK) + throw gcnew QpidException("get XA XID"); + + // Convert the X/Open format to the internal Qpid format + xidp = new qpid::framing::Xid(); + xidp->setFormat((uint32_t) winXid.formatID); + int bqualPos = 0; + if (winXid.gtrid_length > 0) { + xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length)); + bqualPos = winXid.gtrid_length; + } + if (winXid.bqual_length > 0) { + xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length)); + } + + // create the callback chain: DTC proxy -> DtcCallbackHandler -> this + inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback); + IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate); + nativeHandler = new DtcCallbackHandler(static_cast(ip.ToPointer())); + // add myself for later smart pointer destruction + nativeHandler->AddRef(); + + hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle)); + + if (hr != S_OK) + throw gcnew QpidException("Enlist"); + + success = true; + } + finally { + if (!success) + Cleanup(); + if (comTxp != IntPtr::Zero) + ((IUnknown *) comTxp.ToPointer())->Release(); + } +} + + +void XaTransaction::Cleanup() { + if (firstDtxStartCompletionp != NULL) { + try { + firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); + } + catch (...) { + // TODO: log it? + } + + firstDtxStartCompletionp = NULL; + } + + if (nativeHandler != NULL) { + nativeHandler->Release(); + nativeHandler = NULL; + } + if (xidp != NULL) { + delete xidp; + xidp = NULL; + } +} + + +XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) { + lock l(enlistedSessions); + if (!active) + throw gcnew QpidException("transaction enlistment internal error"); + if (!enlistedSessions->Contains(session)) { + enlistedSessions->Add(session); + if (firstEnlistedSession == nullptr) { + firstEnlistedSession = session; + IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false); + firstDtxStartCompletionp = (TypedResult *) intptr.ToPointer(); + } + else { + // the broker must see the dtxStart as a join operation, and it must arrive + // at the broker after the first dtx start + if (firstDtxStartCompletionp != NULL) + firstDtxStartCompletionp->wait(); + session->DtxStart((IntPtr) xidp, true, false); + } + } + else { + // already started once, so resume is true + session->DtxStart((IntPtr) xidp, false, true); + } + return this; +} + + +void XaTransaction::SessionClosing(AmqpSession^ session) { + lock l(enlistedSessions); + if (!enlistedSessions->Contains(session)) + return; + + enlistedSessions->Remove(session); + if (!active) { + // Phase0Flush already done on all sessions + l.release(); + return; + } + + IntPtr completion = session->BeginPhase0Flush(this); + session->EndPhase0Flush(this, completion); + + if (session == firstEnlistedSession) { + // if we just completed the dtxEnd, we know the dtxStart completed before that + if (firstDtxStartCompletionp != NULL) { + firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); + firstDtxStartCompletionp = NULL; + } + } +} + + +void XaTransaction::Phase0Flush() { + // let each session delimit their transactional work with an AMQP dtx.end protocol frame + lock l(enlistedSessions); + if (!active) + return; + + active = false; // no more enlistments + int scount = enlistedSessions->Count; + + if (scount > 0) { + array ^completions = gcnew array(scount); + for (int i = 0; i < scount; i++) { + + // TODO: skip phase0 flush for rollback case + + completions[i] = enlistedSessions[i]->BeginPhase0Flush(this); + } + + for (int i = 0; i < scount; i++) { + // without each session.sync(), session commands are queued up in the right order, + // but on their separate outbound channels, and destined for receipt at separate Broker inbound + // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the + // correct order before commit on the broker without the sync. + enlistedSessions[i]->EndPhase0Flush(this, completions[i]); + } + } + + // since all dtxEnds have completed, we know all starts have too + if (firstDtxStartCompletionp != NULL) { + try { + firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp); + } + catch (...) { + // TODO: log it? + } + + firstDtxStartCompletionp = NULL; + } +} + + +bool XaTransaction::DtcCallback (DtcCallbackType callback) { + // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?) + + if (AppDomain::CurrentDomain->IsFinalizingForUnload()) + return false; + + IntPtr intptr = IntPtr::Zero; + currentCommand = callback; + + try { + switch (callback) { + case DTC_PREPARE: + Phase0Flush(); + try { + intptr = controlSession->DtxPrepare((IntPtr) xidp); + preparing = true; + resourceManager->IncrementDoubt(); + } + catch (System::Exception^ ) { + // intptr remains nullptr + } + commandCompletionp = (TypedResult *) intptr.ToPointer(); + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); + break; + + case DTC_COMMIT: +#ifdef QPID_RECOVERY_TEST_HOOK + if (debugFailMode){ return; } +#endif + // no phase 0 required. always preceded by a prepare + try { + intptr = controlSession->DtxCommit((IntPtr) xidp, false); + } + catch (System::Exception^ ) { + // intptr remains nullptr + } + commandCompletionp = (TypedResult *) intptr.ToPointer(); + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); + break; + + case DTC_ABORT: + Phase0Flush(); +#ifdef QPID_RECOVERY_TEST_HOOK + if (debugFailMode){ return; } +#endif + try { + intptr = controlSession->DtxRollback((IntPtr) xidp); + } + catch (System::Exception^ ) { + // intptr remains nullptr + } + commandCompletionp = (TypedResult *) intptr.ToPointer(); + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); + break; + + case DTC_SINGLE_PHASE: + Phase0Flush(); + try { + intptr = controlSession->DtxCommit((IntPtr) xidp, true); + } + catch (System::Exception^ ) { + // intptr remains nullptr + } + commandCompletionp = (TypedResult *) intptr.ToPointer(); + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); + break; + + case DTC_TMDOWN: + commandCompletionp = NULL; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter)); + break; + } + return true; + } + catch (System::Exception^ e) { + // TODO: log it + Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString()); + } + catch (...) { + // TODO: log it + } + return false; +} + + +// this handles the case where the application regains control for +// a new transaction before we are notified (abort/rollback +// optimization in DTC). + +void XaTransaction::NotifyPhase0() { + if (active) + Phase0Flush(); +} + + +void XaTransaction::AsyncCompleter(Object ^unused) { + bool success = false; + + if (commandCompletionp != NULL) { + try { + // waits for the AMQP broker's response and returns the decoded content + XaResult& xaResult = commandCompletionp->get(); + if (xaResult.hasStatus()) { + if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) { + success = true; + } + } + } + catch (...) { + // TODO: log it? + } + try { + controlSession->ReleaseCompletion((IntPtr) commandCompletionp); + } + catch (...) { + // TODO: log it? + } + + commandCompletionp = NULL; + } + + ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle; + + HRESULT hr = success ? S_OK : E_FAIL; + + switch (currentCommand) { + case DTC_PREPARE: + dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); + break; + + case DTC_COMMIT: + dtcTxHandle->CommitRequestDone(hr); + if (success) + resourceManager->DecrementDoubt(); + Complete(); + break; + + case DTC_ABORT: + dtcTxHandle->AbortRequestDone(hr); + if (success) { + if (preparing) { + preparing = false; + resourceManager->DecrementDoubt(); + } + } + Complete(); + break; + + case DTC_SINGLE_PHASE: + if (success) + hr = XACT_S_SINGLEPHASE; + dtcTxHandle->PrepareRequestDone(hr, NULL, NULL); + Complete(); + break; + + case DTC_TMDOWN: + // Stop the RM from accepting new enlistments + resourceManager->TmDown(); + Complete(); + break; + } +} + + +void XaTransaction::Complete() { + Cleanup(); + resourceManager->Complete(systemTransaction); + completionHandle->Set(); +} + + +void XaTransaction::WaitForCompletion() { + completionHandle->WaitOne(); +} + + + /* +void XaTransaction::WaitForFlush() { + isFlushedHandle->WaitOne(); +} + */ + +// called from DtxResourceManager Finalize + +void XaTransaction::ChildFinalize() { + lock l(enlistedSessions); + Phase0Flush(); + Cleanup(); +} + + + +}}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/XaTransaction.h b/wcf/src/Apache/Qpid/Interop/XaTransaction.h new file mode 100644 index 0000000000..8ff9f99893 --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/XaTransaction.h @@ -0,0 +1,96 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Transactions; + +enum DtcCallbackType{ + DTC_PREPARE, + DTC_COMMIT, + DTC_ABORT, + DTC_SINGLE_PHASE, + DTC_TMDOWN +}; + + +ref class DtxResourceManager; +class DtcCallbackHandler; + +// Function pointer declaratiom for managed space delegate +typedef bool (__stdcall *DtcCallbackFp)(DtcCallbackType); + +// and the delegate with the same signature +public delegate bool DtcCallbackDelegate(DtcCallbackType); + + + +public ref class XaTransaction +{ +private: + bool active; + DtxResourceManager^ resourceManager; + Transaction^ systemTransaction; + AmqpSession^ controlSession; + Collections::Generic::List^ enlistedSessions; + qpid::framing::Xid* xidp; + DtcCallbackHandler* nativeHandler; + bool preparing; + DtcCallbackDelegate^ inboundDelegate; + // the Qpid async result of the AMQP dtx prepare/commit commands + TypedResult* commandCompletionp; + // the Qpid async result of the first session to do dtx start + TypedResult* firstDtxStartCompletionp; + ManualResetEvent^ completionHandle; + + AmqpSession^ firstEnlistedSession; + DtcCallbackType currentCommand; + void AsyncCompleter(Object ^); + void Phase0Flush(); + void Cleanup(); + void Complete(); + +internal: + XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *pXaHelper, DWORD rmCookie, DtxResourceManager^ rm); + XaTransaction^ Enlist (AmqpSession ^session); + bool DtcCallback (DtcCallbackType callback); + void NotifyPhase0(); + void ChildFinalize(); + void SessionClosing(AmqpSession^ session); + void WaitForCompletion(); + + property IntPtr XidHandle { + IntPtr get () { return (IntPtr) xidp; } + } + +#ifdef QPID_RECOVERY_TEST_HOOK + void ForceRecovery(); + bool debugFailMode; +#endif + +}; + +}}} // namespace Apache::Qpid::Interop + -- cgit v1.2.1