diff options
| author | Alan Conway <aconway@apache.org> | 2008-06-18 17:53:30 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-06-18 17:53:30 +0000 |
| commit | 9bf82c2c8c45a5228643a285f8db0b1061a69ad9 (patch) | |
| tree | d38be99fcb793712c2a2b5fb56dcbbb8294ff818 /cpp/src/qpid/broker | |
| parent | 02757b560356e0ddb090fbe103e0b65db6dbd3b3 (diff) | |
| download | qpid-python-9bf82c2c8c45a5228643a285f8db0b1061a69ad9.tar.gz | |
Bring cluster code up to date.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@669236 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 63 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 35 |
6 files changed, 63 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 7fc2b6c4f3..b058978ccf 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -33,9 +33,6 @@ using namespace qpid::broker; using boost::dynamic_pointer_cast; using boost::intrusive_ptr; -static const uint8_t BASIC = 1; -static const uint8_t MESSAGE = 2; - RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, DtxManager& _dtxMgr, uint64_t _stagingThreshold) : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), stagingThreshold(_stagingThreshold) {} diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index bdd8edac87..a4a40a03e8 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -288,7 +288,6 @@ bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) { - //TODO: remove the now redundant checks (channel.flow & basic|message.qos removed): blocked = !(filter(msg) && checkCredit(msg) && parent->flowActive && (!ackExpected || parent->checkPrefetch(msg))); return !blocked; } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index bf3a7756b5..1310e6c51a 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -54,8 +54,7 @@ class SessionContext; * SemanticState holds the L3 and L4 state of an open session, whether * attached to a channel or suspended. */ -class SemanticState : public framing::FrameHandler::Chains, - public sys::OutputTask, +class SemanticState : public sys::OutputTask, private boost::noncopyable { class ConsumerImpl : public Consumer, public sys::OutputTask diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2f09c6b5ac..c752f6315b 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -71,7 +71,7 @@ void SessionHandler::setState(const std::string& name, bool force) { session = connection.broker.getSessionManager().attach(*this, id, force); } -FrameHandler* SessionHandler::getInHandler() { return session.get(); } +FrameHandler* SessionHandler::getInHandler() { return session.get() ? &session->in : 0; } qpid::SessionState* SessionHandler::getState() { return session.get(); } void SessionHandler::readyToSend() { diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dada7567f9..d7089424a5 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,11 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), + inLastHandler(*this), + outLastHandler(*this), + inChain(inLastHandler), + outChain(outLastHandler) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -102,20 +106,20 @@ void SessionState::detach() { handler = 0; if (mgmtObject.get() != 0) mgmtObject->set_attached (0); - } +} void SessionState::attach(SessionHandler& h) { // activateOutput can be called in a different thread, lock to protect attached status - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); QPID_LOG(debug, getId() << ": attached on broker."); - handler = &h; - if (mgmtObject.get() != 0) - { - mgmtObject->set_attached (1); - mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); - } + handler = &h; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); } +} void SessionState::activateOutput() { // activateOutput can be called in a different thread, lock to protect attached status @@ -137,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, switch (methodId) { - case management::Session::METHOD_DETACH : + case management::Session::METHOD_DETACH : if (handler != 0) { handler->sendDetach(); @@ -145,18 +149,18 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; - case management::Session::METHOD_CLOSE : + case management::Session::METHOD_CLOSE : /* - if (handler != 0) - { - handler->getConnection().closeChannel(handler->getChannel()); - } - status = Manageable::STATUS_OK; - break; + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; */ - case management::Session::METHOD_SOLICITACK : - case management::Session::METHOD_RESETLIFESPAN : + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : status = Manageable::STATUS_NOT_IMPLEMENTED; break; } @@ -218,10 +222,12 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) receiverCompleted(msg->getCommandId()); if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); - } +} -void SessionState::handle(AMQFrame& frame) -{ +void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } + +void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether @@ -252,6 +258,11 @@ void SessionState::handle(AMQFrame& frame) } } +void SessionState::handleOutLast(AMQFrame& frame) { + assert(handler); + handler->out(frame); +} + DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); @@ -267,7 +278,7 @@ void SessionState::sendCompletion() { handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++) - semanticState.completed(i->first(), i->last()); + semanticState.completed(i->first(), i->last()); } void SessionState::readyToSend() { @@ -280,4 +291,8 @@ void SessionState::readyToSend() { Broker& SessionState::getBroker() { return broker; } +framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } + +framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 7b70789161..f2774dadd3 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -62,10 +62,10 @@ class SessionManager; * themselves have state. */ class SessionState : public qpid::SessionState, - public SessionContext, - public DeliveryAdapter, + public SessionContext, + public DeliveryAdapter, public management::Manageable, - public framing::FrameHandler + public framing::FrameHandler::InOutHandler { public: SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); @@ -87,8 +87,6 @@ class SessionState : public qpid::SessionState, /** OutputControl **/ void activateOutput(); - void handle(framing::AMQFrame& frame); - void senderCompleted(const framing::SequenceSet& ranges); void sendCompletion(); @@ -99,32 +97,43 @@ class SessionState : public qpid::SessionState, // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); + ManagementMethod (uint32_t methodId, management::Args& args); void readyToSend(); + framing::FrameHandler::Chain& getInChain(); + framing::FrameHandler::Chain& getOutChain(); + private: + void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); + void enqueued(boost::intrusive_ptr<Message> msg); + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + // End of the input & output chains. + void handleInLast(framing::AMQFrame& frame); + void handleOutLast(framing::AMQFrame& frame); + Broker& broker; SessionHandler* handler; sys::AbsTime expiry; // Used by SessionManager. sys::Mutex lock; bool ignoring; std::string name; - SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; IncompleteMessageList incomplete; - IncompleteMessageList::CompletionListener enqueuedOp; - management::Session::shared_ptr mgmtObject; - void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); - void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - void enqueued(boost::intrusive_ptr<Message> msg); + framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleInLast> inLastHandler; + framing::FrameHandler::MemFunRef<SessionState, &SessionState::handleOutLast> outLastHandler; + framing::FrameHandler::Chain inChain, outChain; - friend class SessionManager; + friend class SessionManager; }; |
