/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/MessageListener.h" #include "qpid/framing/FrameSet.h" #include "AmqpConnection.h" #include "AmqpSession.h" #include "DtxResourceManager.h" #include "XaTransaction.h" #include "QpidException.h" #include "QpidMarshal.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace System::Transactions; using namespace msclr; /* * There is one DtxResourceManager per broker and per application process. * * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The * XaTransaction object works through the lifecycle of the Transaction, including prompting the * enlisted sessions to send their delimiting dtxEnd commands. * * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe. * * To enable transaction support: * DTC requires a registry key to find the plugin * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll] * DTC needs to be configured for XA * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions * */ // TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections. // But note that a new RM registration with the DTC is very expensive. DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) { dtcComp = NULL; xaHelperp = NULL; rmCookie = 0; doubtCount = 0; tmDown = false; AmqpConnection^ clonedCon = appConnection->Clone(); dtxControlSession = clonedCon->CreateSession(); dataSourceName = clonedCon->DataSourceName; transactionMap = gcnew Collections::Generic::Dictionary(); HRESULT hr; try { // instead of pinning this instance, just use tmp stack variables for small stuff IUnknown* tmp = NULL; // request the default DTC hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp); if (hr != S_OK) throw gcnew QpidException("connection failure to DTC service"); dtcComp = tmp; IDtcToXaHelperSinglePipe *tmp2 = NULL; hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2); if (hr != S_OK) throw gcnew QpidException("DTC XA unavailable"); xaHelperp = tmp2; std::string native_dsn = QpidMarshal::ToNative(dataSourceName); DWORD tmp3; // This call doesn't return until the DTC has opened and closed a connection to the broker // and written a recovery entry in its log. hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast(native_dsn.c_str()), "qpidxarm.dll", &tmp3); if (hr != S_OK) { switch (hr) { case E_FAIL: throw gcnew QpidException("Resource Manager DLL configuration error"); case E_INVALIDARG: throw gcnew QpidException("Resource Manager internal error"); case E_OUTOFMEMORY: throw gcnew QpidException("Resource Manager out of memory"); case E_UNEXPECTED: throw gcnew QpidException("Resource Manager internal failure"); case XACT_E_TMNOTAVAILABLE: case XACT_E_CONNECTION_DOWN: throw gcnew QpidException("MSDTC unavailable"); default: throw gcnew QpidException("Resource Manager Registration failed"); } } rmCookie = tmp3; } finally { if (rmCookie == 0) { // undo partial construction Cleanup(); } } } DtxResourceManager::!DtxResourceManager() { Cleanup(); } DtxResourceManager::~DtxResourceManager() { GC::SuppressFinalize(this); Cleanup(); } // Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction // called once for each outstanding tx void DtxResourceManager::TmDown() { // this block is the only place where both locks are held lock l1(transactionMap); lock l2(resourceManagerMap); if (tmDown) return; tmDown = true; resourceManagerMap->Remove(this->dataSourceName); // defer cleanup until last TmDown notification received } void DtxResourceManager::Cleanup() { for each (Collections::Generic::KeyValuePair kvp in transactionMap) { XaTransaction^ xaTr = kvp.Value; xaTr->ChildFinalize(); } try { if (rmCookie != 0) { // implies no recovery needed bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0); ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession); rmCookie = 0; } if (xaHelperp != NULL) { ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release(); xaHelperp = NULL; } if (dtcComp != NULL) { ((IUnknown *) dtcComp)->Release(); dtcComp = NULL; } if (dtxControlSession != nullptr) { dtxControlSession->Connection->Close(); } } catch (Exception^) {} } XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { // find or create the RM instance associated with the session's broker AmqpConnection^ connection = appSession->Connection; DtxResourceManager^ instance = connection->CachedResourceManager; // try cached rm first if (instance != nullptr) { XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction); if (xaTx != nullptr) return xaTx; else { // cached version no longer available, force new rm creation connection->CachedResourceManager = nullptr; } } lock l(resourceManagerMap); String^ dsn = connection->DataSourceName; if (!resourceManagerMap->TryGetValue(dsn, instance)) { instance = gcnew DtxResourceManager(connection->Clone()); resourceManagerMap->Add(dsn, instance); connection->CachedResourceManager = instance; } l.release(); return instance->InternalGetXaTransaction(appSession, transaction); } XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) { // find or create the tx proxy instance associated with the DTC transaction lock l(transactionMap); if (tmDown) return nullptr; XaTransaction^ xaTransaction = nullptr; if (!transactionMap->TryGetValue(transaction, xaTransaction)) { xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this); transactionMap->Add(transaction, xaTransaction); } return xaTransaction; } void DtxResourceManager::Complete(Transaction ^tx) { lock l(transactionMap); transactionMap->Remove(tx); if (tmDown && (transactionMap->Count == 0)) { // no more activity on this instance GC::SuppressFinalize(this); Cleanup(); } } void DtxResourceManager::IncrementDoubt() { Interlocked::Increment(doubtCount); } void DtxResourceManager::DecrementDoubt() { Interlocked::Decrement(doubtCount); } #ifdef QPID_RECOVERY_TEST_HOOK void DtxResourceManager::ForceRecovery(Transaction ^tx) { lock l(resourceManagerMap); for each (Collections::Generic::KeyValuePair kvp in resourceManagerMap) { Collections::Generic::Dictionary^ txmap = kvp.Value->transactionMap; XaTransaction^ xaTransaction = nullptr; lock l2(txmap); if (txmap->TryGetValue(tx, xaTransaction)) { xaTransaction->ForceRecovery(); } } } #endif }}} // namespace Apache::Qpid::Interop