diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/broker/SessionState.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 245 |
1 files changed, 222 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 1ca7b6dfc1..1ab17e9893 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" #include "qpid/broker/RateFlowcontrol.h" +#include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" @@ -60,9 +61,9 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), - rateFlowcontrol(0) + rateFlowcontrol(0), + asyncCommandCompleter(new AsyncCommandCompleter(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -95,6 +96,7 @@ void SessionState::addManagementObject() { } SessionState::~SessionState() { + asyncCommandCompleter->cancel(); semanticState.closed(); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -125,6 +127,7 @@ bool SessionState::isLocal(const ConnectionToken* t) const void SessionState::detach() { QPID_LOG(debug, getId() << ": detached on broker."); + asyncCommandCompleter->detached(); disableOutput(); handler = 0; if (mgmtObject != 0) @@ -145,6 +148,7 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } + asyncCommandCompleter->attached(); } void SessionState::abort() { @@ -202,15 +206,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, } void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { + currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + if (currentCommandComplete) receiverCompleted(id); + if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { - incomplete.process(enqueuedOp, true); + + if (method->isSync() && currentCommandComplete) { sendAcceptAndCompletion(); } } @@ -253,23 +259,14 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) header.setEof(false); msg->getFrames().append(header); } + if (broker.isTimestamping()) + msg->setTimestamp(); msg->setPublisher(&getConnection()); + msg->getIngressCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - - if (msg->isEnqueueComplete()) { - enqueued(msg); - } else { - incomplete.add(msg); - } - - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { - incomplete.process(enqueuedOp, true); - sendAcceptAndCompletion(); - } else { - incomplete.process(enqueuedOp, false); - } + IncompleteIngressMsgXfer xfer(this, msg); + msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } // Handle producer session flow control @@ -319,11 +316,41 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +/** Invoked when the given inbound message is finished being processed + * by all interested parties (eg. it is done being enqueued to all queues, + * its credit has been accounted for, etc). At this point, msg is considered + * by this receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeRcvMsg(SequenceNumber id, + bool requiresAccept, + bool requiresSync) { - receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - accepted.add(msg->getCommandId()); + // Mark this as a cluster-unsafe scope since it can be called in + // journal threads or connection threads as part of asynchronous + // command completion. + sys::ClusterUnsafeScope cus; + + bool callSendCompletion = false; + receiverCompleted(id); + if (requiresAccept) + // will cause msg's seq to appear in the next message.accept we send. + accepted.add(id); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this msg? If so, complete them. + while (!pendingExecutionSyncs.empty() && + receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { + const SequenceNumber id = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); + receiverCompleted(id); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (requiresSync || callSendCompletion) { + sendAcceptAndCompletion(); + } } void SessionState::handleIn(AMQFrame& frame) { @@ -396,4 +423,176 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +void SessionState::addPendingExecutionSync() +{ + SequenceNumber syncCommandId = receiverGetCurrent(); + if (receiverGetIncomplete().front() < syncCommandId) { + currentCommandComplete = false; + pendingExecutionSyncs.push(syncCommandId); + asyncCommandCompleter->flushPendingMessages(); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + } +} + + +/** factory for creating a reference-counted IncompleteIngressMsgXfer object + * which will be attached to a message that will be completed asynchronously. + */ +boost::intrusive_ptr<AsyncCompletion::Callback> +SessionState::IncompleteIngressMsgXfer::clone() +{ + // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. + // If the client is pending the message.transfer completion, flush now to force immediate write to journal. + if (requiresSync) + msg->flush(); + else { + // otherwise, we need to track this message in order to flush it if an execution.sync arrives + // before it has been completed (see flushPendingMessages()) + pending = true; + completerContext->addPendingMessage(msg); + } + + return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this)); +} + + +/** Invoked by the asynchronous completer associated with a received + * msg that is pending Completion. May be invoked by the IO thread + * (sync == true), or some external thread (!sync). + */ +void SessionState::IncompleteIngressMsgXfer::completed(bool sync) +{ + if (pending) completerContext->deletePendingMessage(id); + if (!sync) { + /** note well: this path may execute in any thread. It is safe to access + * the scheduledCompleterContext, since *this has a shared pointer to it. + * but not session! + */ + session = 0; + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); + completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + } else { + // this path runs directly from the ac->end() call in handleContent() above, + // so *session is definately valid. + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + session->completeRcvMsg(id, requiresAccept, requiresSync); + } + } + completerContext = boost::intrusive_ptr<AsyncCommandCompleter>(); +} + + +/** Scheduled from an asynchronous command's completed callback to run on + * the IO thread. + */ +void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) +{ + ctxt->completeCommands(); +} + + +/** Track an ingress message that is pending completion */ +void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); + bool unique = pendingMsgs.insert(item).second; + if (!unique) { + assert(false); + } +} + + +/** pending message has completed */ +void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.erase(id); +} + + +/** done when an execution.sync arrives */ +void SessionState::AsyncCommandCompleter::flushPendingMessages() +{ + std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. + } + // drop lock, so it is safe to call "flush()" + for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); + i != copy.end(); ++i) { + i->second->flush(); + } +} + + +/** mark an ingress Message.Transfer command as completed. + * This method must be thread safe - it may run on any thread. + */ +void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, + bool requiresAccept, + bool requiresSync) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + if (session && isAttached) { + MessageInfo msg(cmd, requiresAccept, requiresSync); + completedMsgs.push_back(msg); + if (completedMsgs.size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&schedule, + session->asyncCommandCompleter)); + } + } +} + + +/** Cause the session to complete all completed commands. + * Executes on the IO thread. + */ +void SessionState::AsyncCommandCompleter::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (session && session->isAttached()) { + for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); + msg != completedMsgs.end(); ++msg) { + session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); + } + } + completedMsgs.clear(); +} + + +/** cancel any pending calls to scheduleComplete */ +void SessionState::AsyncCommandCompleter::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + session = 0; +} + + +/** inform the completer that the session has attached, + * allows command completion scheduling from any thread */ +void SessionState::AsyncCommandCompleter::attached() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + isAttached = true; +} + + +/** inform the completer that the session has detached, + * disables command completion scheduling from any thread */ +void SessionState::AsyncCommandCompleter::detached() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + isAttached = false; +} + }} // namespace qpid::broker |