/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include "qpid/client/AsyncSession.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/FrameSet.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Demux.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "MessageBodyStream.h" #include "AmqpMessage.h" #include "AmqpSession.h" #include "InputLink.h" #include "QpidMarshal.h" #include "QpidException.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace System::Threading; using namespace msclr; using namespace qpid::client; using namespace qpid::framing; using namespace std; using namespace Apache::Qpid::AmqpTypes; // Scalability note: When using async methods, an async helper thread is created // to block on the Demux BlockingQueue. This design should be revised in line // with proposed changes to the native library to reduce the number of servicing // threads for large numbers of subscriptions. // synchronization is accomplished with locks, but also by ensuring that only one // MessageWaiter (the one at the front of the line) is ever active. // async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch // thread (who deposits FrameSets into the local queue and is oblivious to the // managed space locks). // The folowing def must match the "Frames" private typedef. // TODO, make Qpid-cpp "Frames" definition visible. typedef qpid::InlineVector FrameSetFrames; InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange) : amqpSession(session), subscriptionp(NULL), localQueuep(NULL), queuePtrp(NULL), dequeuedFrameSetpp(NULL), disposed(false), finalizing(false) { bool success = false; System::Exception^ linkException = nullptr; waiters = gcnew Collections::Generic::List(); linkLock = waiters; // private and available subscriptionLock = gcnew Object(); qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); qpidAddress->ResolveLink(session); browsing = qpidAddress->Browsing; try { std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); if (temporary) { qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); qpidSessionp->sync(); } localQueuep = new LocalQueue; SubscriptionSettings settings; settings.flowControl = FlowControl::messageCredit(0); settings.completionMode = CompletionMode::MANUAL_COMPLETION; if (browsing) { settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; } else { settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; } Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup // the roundabout way to obtain localQueuep->queue SessionBase_0_10Access sa(*qpidSessionp); boost::shared_ptr simpl = sa.get(); queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); success = true; } finally { if (!success) { Cleanup(); linkException = gcnew QpidException ("InputLink creation failure"); throw linkException; } } } // called with lock held void InputLink::ReleaseNative() { // involves talking to the Broker unless the connection is broken if ((subscriptionp != NULL) && !finalizing) { // TODO: find boost time error on cleanup when in finalizer thread try { subscriptionp->cancel(); } catch (const std::exception& error) { // TODO: log this properly std::cout << "shutdown error " << error.what() << std::endl; } } // free native mem (or smart pointers) that we own if (subscriptionp != NULL) { delete subscriptionp; subscriptionp = NULL; } if (queuePtrp != NULL) { delete queuePtrp; queuePtrp = NULL; } if (localQueuep != NULL) { if (!finalizing) { // TODO: find boost time error on cleanup when in finalizer thread delete localQueuep; localQueuep = NULL; } } if (dequeuedFrameSetpp != NULL) { delete dequeuedFrameSetpp; dequeuedFrameSetpp = NULL; } } void InputLink::Cleanup() { { lock l(linkLock); if (disposed) return; disposed = true; // if the asyncHelper exists and is idle, unblock it if (asyncHelperWaitHandle != nullptr) { asyncHelperWaitHandle->Set(); } // wakeup anyone waiting for messages if (queuePtrp != NULL) (*queuePtrp)->close(); // wait for any sync operations on the subscription to complete before ReleaseNative lock l2(subscriptionLock); try {} finally { ReleaseNative(); } } // Now that subscription is torn down, we can execute pending delete on remote node qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } InputLink::~InputLink() { Cleanup(); } InputLink::!InputLink() { finalizing = true; Cleanup(); } void InputLink::Close() { // Simulate Dispose()... Cleanup(); GC::SuppressFinalize(this); } // call with lock held bool InputLink::haveMessage() { if (dequeuedFrameSetpp != NULL) return true; if (queuePtrp != NULL) { if ((*queuePtrp)->size() > 0) return true; } return false; } IntPtr InputLink::nextLocalMessage() { lock l(linkLock); if (disposed) return (IntPtr) NULL; // A message already pulled off BlockingQueue? if (dequeuedFrameSetpp != NULL) { QpidFrameSetPtr* rv = dequeuedFrameSetpp; dequeuedFrameSetpp = NULL; return (IntPtr) rv; } if ((*queuePtrp)->empty()) return (IntPtr) NULL; bool received = false; QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; try { received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); if (received) { QpidFrameSetPtr* rv = frameSetpp; // no need to free native in finally block frameSetpp = NULL; return (IntPtr) rv; } } catch(const std::exception& error) { // should be no async tampering with queue since we hold the lock and have a // smart pointer ref to the native LocalQueue, even if the network connection fails... cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <close(); } // Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks // a blocking thread without interefering with queue contents or the ability to push // new incoming messages. void InputLink::resetQueue() { lock l(linkLock); if (disposed) return; if ((*queuePtrp)->isClosed()) { (*queuePtrp)->open(); } } // returns true if there is a message to consume, i.e. nextLocalMessage() won't block bool InputLink::internalWaitForMessage() { Demux::QueuePtr demuxQueuePtr; bool received = false; QpidFrameSetPtr* frameSetpp = NULL; try { lock l(linkLock); if (disposed) return false; if (haveMessage()) return true; AdjustCredit(); // get a scoped smart ptr ref to guard against async close or hangup demuxQueuePtr = *queuePtrp; frameSetpp = new QpidFrameSetPtr; l.release(); // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); l.acquire(); if (received) { dequeuedFrameSetpp = frameSetpp; frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream } return true; } catch(const std::exception& ) { // timeout or connection closed return false; } finally { if (frameSetpp != NULL) { delete frameSetpp; } } return false; } // call with lock held void InputLink::addWaiter(MessageWaiter^ waiter) { waiters->Add(waiter); if (waiters->Count == 1) { // mark this waiter as ready to run // Only the waiter at the head of the queue is active. waiter->Activate(); } if (waiter->Assigned) return; if (asyncHelperWaitHandle == nullptr) { asyncHelperWaitHandle = gcnew ManualResetEvent(false); ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); (gcnew Thread(threadDelegate))->Start(); } if (waiters->Count == 1) { // wake up the asyncHelper asyncHelperWaitHandle->Set(); } } void InputLink::removeWaiter(MessageWaiter^ waiter) { // a waiter can be removed from anywhere in the list if timed out lock l(linkLock); int idx = waiters->IndexOf(waiter); if (idx == -1) { // TODO: assert or log if (asyncHelperWaitHandle != nullptr) { // just in case. asyncHelperWaitHandle->Set(); } return; } waiters->RemoveAt(idx); if (waiter->TimedOut) { // may have to give back message if it arrives momentarily AdjustCredit(); } // let the next waiter know it's his turn. if (waiters->Count > 0) { MessageWaiter^ nextWaiter = waiters[0]; // wakeup the asyncHelper thread to help out if necessary. if (!nextWaiter->Assigned) { asyncHelperWaitHandle->Set(); } l.release(); nextWaiter->Activate(); return; } else { if (disposed && (asyncHelperWaitHandle != nullptr)) { asyncHelperWaitHandle->Set(); } } } void InputLink::asyncHelper() { lock l(linkLock); while (true) { if (disposed && (waiters->Count == 0)) { asyncHelperWaitHandle = nullptr; return; } if (waiters->Count > 0) { MessageWaiter^ waiter = waiters[0]; l.release(); if (waiter->AcceptForWork()) { waiter->Run(); } l.acquire(); } // sleep if more work may be coming or it is currently someone else's turn if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { // wait for something to do asyncHelperWaitHandle->Reset(); l.release(); asyncHelperWaitHandle->WaitOne(); l.acquire(); } } } void InputLink::sync() { // used by the MessageWaiter timeout thread to not run before fully initialized lock l(linkLock); } void InputLink::PrefetchLimit::set(int value) { lock l(linkLock); prefetchLimit = value; int delta = 0; // rough rule of thumb to keep the flow, but reduce chatter. // for small messages, the credit request is almost as expensive as the transfer itself. // experience may suggest a better heuristic or require a property for the low water mark if (prefetchLimit >= 3) { delta = prefetchLimit / 3; } minWorkingCredit = prefetchLimit - delta; AdjustCredit(); } // call with lock held void InputLink::AdjustCredit() { if (creditSyncPending || disposed) return; // low watermark check if ((prefetchLimit != 0) && (workingCredit >= minWorkingCredit) && (workingCredit >= waiters->Count)) return; // should have enough for all waiters or to satisfy the prefetch window int targetCredit = waiters->Count; if (targetCredit < prefetchLimit) targetCredit = prefetchLimit; if (targetCredit > workingCredit) { subscriptionp->grantMessageCredit(targetCredit - workingCredit); workingCredit = targetCredit; return; } if (targetCredit < workingCredit) { if ((targetCredit == 0) && (prefetchLimit == 0)) { creditSyncPending = true; ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); } // TODO: also shrink credit when prefetchLimit != 0 } } void InputLink::SyncCredit(Object ^unused) { lock l(linkLock); try { if (disposed) return; if (!amqpSession->MessageStop(subscriptionp->getName())) { // connection closed return; } l.release(); // use setFlowControl to re-enable credit flow on the broker. // setFlowControl is a sync operation { lock l2(subscriptionLock); if (subscriptionp != NULL) { subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); } } l.acquire(); if (disposed) return; // let existing waiters use up any messages that arrived. // local queue size can only decrease until more credit is issued while (true) { if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { l.release(); // a rare use case and not used in performance oriented code. // optimization can wait until the qpid/messaging api is used Thread::Sleep(10); l.acquire(); if (disposed) return; } else { break; } } // At this point, the lock is held and we are fully synced with the broker // so we have a valid snapshot if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { // can't be sure application will request a message again any time soon QpidFrameSetPtr frameSetp; while (!(*queuePtrp)->empty()) { (*queuePtrp)->pop(frameSetp); SequenceSet frameSetID(frameSetp->getId()); subscriptionp->release(frameSetID); } // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a // MessageWaiter about to to get the nextLocalMessage(), or implicitely // from a WaitForMessage(). } // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit workingCredit = (*queuePtrp)->size(); if (dequeuedFrameSetpp != NULL) { workingCredit++; } } finally { creditSyncPending = false; } AdjustCredit(); } AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) { QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); bool ownFrameSet = true; bool haveProperties = false; try { MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); ownFrameSet = false; // stream releases on close/dispose AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); uint64_t contentSize = (*fspp)->getContentSize(); SequenceSet frameSetID((*fspp)->getId()); // target managed representation AmqpProperties^ amqpProperties = gcnew AmqpProperties(); // source native representation const DeliveryProperties* deliveryProperties = headerBodyp->get(); const qpid::framing::MessageProperties* messageProperties = headerBodyp->get(); if (deliveryProperties) { if (deliveryProperties->hasRoutingKey()) { haveProperties = true; amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); } if (deliveryProperties->hasDeliveryMode()) { if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) amqpProperties->Durable = true; } if (deliveryProperties->hasTtl()) { long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; amqpProperties->TimeToLive = Nullable(TimeSpan::FromTicks(ticks)); } } if (messageProperties) { if (messageProperties->hasReplyTo()) { haveProperties = true; const ReplyTo& rpto = messageProperties->getReplyTo(); String^ rk = nullptr; String^ ex = nullptr; if (rpto.hasRoutingKey()) { rk = gcnew String(rpto.getRoutingKey().c_str()); } if (rpto.hasExchange()) { ex = gcnew String(rpto.getExchange().c_str()); } amqpProperties->SetReplyTo(ex,rk); } if (messageProperties->hasContentType()) { haveProperties = true; amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); if (messageProperties->hasContentEncoding()) { String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); if (!String::IsNullOrEmpty(enc)) { // TODO: properly assemble 1.0 style to 0-10 for all cases amqpProperties->ContentType += "; charset=" + enc; } } } if (messageProperties->hasCorrelationId()) { haveProperties = true; const std::string& ncid = messageProperties->getCorrelationId(); int len = ncid.size(); array^ mcid = gcnew array(len); Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); amqpProperties->CorrelationId = mcid; } if (messageProperties->hasUserId()) { haveProperties = true; const std::string& nuid = messageProperties->getUserId(); int len = nuid.size(); array^ muid = gcnew array(len); Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); amqpProperties->UserId = muid; } if (messageProperties->hasApplicationHeaders()) { haveProperties = true; const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); int count = fieldTable.count(); if (count > 0) { haveProperties = true; Collections::Generic::Dictionary^ mmap = gcnew Collections::Generic::Dictionary(count); for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { qpid::framing::FieldValue::Data &data = i->second->getData(); // TODO: replace these generic int/string conversions with handler for each AMQP specific type: // uint8_t dataType = i->second->getType(); // switch (dataType) { case TYPE_CODE_STR8: ... } if (data.convertsToInt()) { mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); } if (data.convertsToString()) { std::string ns = data.getString(); String^ ms = gcnew String(ns.data(), 0, ns.size()); mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); } } amqpProperties->PropertyMap = mmap; } } } if (haveProperties) { amqpMessage->Properties = amqpProperties; } // We have a message we can return to the caller. // Tell the broker we got it. // subscriptionp->accept(frameSetID) is a slow sync operation in the native API // so do it within the AsyncSession directly amqpSession->AcceptAndComplete(frameSetID, browsing); workingCredit--; // check if more messages need to be requested from broker AdjustCredit(); return amqpMessage; } finally { if (ownFrameSet) delete (fspp); } } // As for IInputChannel: // if success, return true + amqpMessage // elseif timeout, return false // elseif closed/EOF, return true and amqpMessage = null // else throw an Exception bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) { lock l(linkLock); if (waiters->Count == 0) { // see if there is a message already available without blocking IntPtr fspp = nextLocalMessage(); if (fspp.ToPointer() != NULL) { amqpMessage = createAmqpMessage(fspp); return true; } } MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); addWaiter(waiter); l.release(); waiter->Run(); l.acquire(); if (waiter->TimedOut) { return false; } IntPtr waiterMsg = waiter->Message; if (waiterMsg.ToPointer() == NULL) { if (disposed) { // indicate normal EOF on channel amqpMessage = nullptr; return true; } } amqpMessage = createAmqpMessage(waiterMsg); return true; } IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) { //TODO: if haveMessage() complete synchronously lock l(linkLock); MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); addWaiter(waiter); return waiter; } bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) { // TODO: validate result MessageWaiter^ waiter = (MessageWaiter ^) result; waiter->WaitForCompletion(); if (waiter->RunException != nullptr) throw waiter->RunException; if (waiter->TimedOut) { amqpMessage = nullptr; return false; } IntPtr waiterMsg = waiter->Message; if (waiterMsg.ToPointer() == NULL) { if (disposed) { // indicate normal EOF on channel amqpMessage = nullptr; return true; } } amqpMessage = createAmqpMessage(waiterMsg); return true; } bool InputLink::WaitForMessage(TimeSpan timeout) { lock l(linkLock); if (disposed) return false; if (waiters->Count == 0) { // see if there is a message already available without blocking if (haveMessage()) return true; } // Same as for TryReceive, except consuming = false MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); addWaiter(waiter); l.release(); waiter->Run(); l.acquire(); if (waiter->TimedOut) { return false; } return haveMessage(); } IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) { lock l(linkLock); // Same as for BeginTryReceive, except consuming = false MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); addWaiter(waiter); return waiter; } bool InputLink::EndWaitForMessage(IAsyncResult^ result) { MessageWaiter^ waiter = (MessageWaiter ^) result; waiter->WaitForCompletion(); if (waiter->TimedOut) { return false; } return haveMessage(); } }}} // namespace Apache::Qpid::Interop