From 2f6d6ad7efd788b71204af67dff51b6233881e2e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 21 Sep 2007 18:26:37 +0000 Subject: Split broker::Session into: broker::SessionState: session info (uuid etc.) + handler chains. broker::SemanticState: session state for the SemanticHandler. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 6 +- cpp/src/qpid/broker/BrokerAdapter.cpp | 48 ++- cpp/src/qpid/broker/BrokerAdapter.h | 36 +- cpp/src/qpid/broker/Connection.cpp | 7 +- cpp/src/qpid/broker/Connection.h | 3 +- cpp/src/qpid/broker/DeliveryRecord.cpp | 4 +- cpp/src/qpid/broker/DeliveryRecord.h | 4 +- cpp/src/qpid/broker/DtxHandlerImpl.cpp | 15 +- cpp/src/qpid/broker/DtxHandlerImpl.h | 4 +- cpp/src/qpid/broker/HandlerImpl.h | 29 +- cpp/src/qpid/broker/MessageHandlerImpl.cpp | 40 +- cpp/src/qpid/broker/MessageHandlerImpl.h | 6 +- cpp/src/qpid/broker/SemanticHandler.cpp | 30 +- cpp/src/qpid/broker/SemanticHandler.h | 18 +- cpp/src/qpid/broker/SemanticState.cpp | 591 ++++++++++++++++++++++++++++ cpp/src/qpid/broker/SemanticState.h | 184 +++++++++ cpp/src/qpid/broker/Session.cpp | 608 ----------------------------- cpp/src/qpid/broker/Session.h | 201 ---------- cpp/src/qpid/broker/SessionHandler.cpp | 4 +- cpp/src/qpid/broker/SessionHandler.h | 8 +- cpp/src/qpid/broker/SessionState.cpp | 63 +++ cpp/src/qpid/broker/SessionState.h | 67 +++- cpp/src/qpid/broker/SuspendedSessions.h | 6 +- 23 files changed, 1016 insertions(+), 966 deletions(-) create mode 100644 cpp/src/qpid/broker/SemanticState.cpp create mode 100644 cpp/src/qpid/broker/SemanticState.h delete mode 100644 cpp/src/qpid/broker/Session.cpp delete mode 100644 cpp/src/qpid/broker/Session.h create mode 100644 cpp/src/qpid/broker/SessionState.cpp (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cf7029dabc..bdac539d92 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -187,8 +187,10 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ - qpid/broker/Session.h \ - qpid/broker/Session.cpp \ + qpid/broker/SemanticState.h \ + qpid/broker/SemanticState.cpp \ + qpid/broker/SessionState.h \ + qpid/broker/SessionState.cpp \ qpid/broker/SessionHandler.h \ qpid/broker/SessionHandler.cpp \ qpid/broker/SemanticHandler.cpp \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index c266b36dfb..0fb521d626 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -16,8 +16,6 @@ * */ #include "BrokerAdapter.h" -#include "Session.h" -#include "SessionHandler.h" #include "Connection.h" #include "DeliveryToken.h" #include "MessageDelivery.h" @@ -38,7 +36,7 @@ typedef std::vector QueueVector; // by the handlers responsible for those classes. // -BrokerAdapter::BrokerAdapter(Session& s) : +BrokerAdapter::BrokerAdapter(SemanticState& s) : HandlerImpl(s), basicHandler(s), exchangeHandler(s), @@ -153,7 +151,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = getSession().getQueue(name); + Queue::shared_ptr queue = state.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -176,7 +174,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getSession().getQueue(name); + queue = state.getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair queue_created = @@ -187,7 +185,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - getSession().setDefaultQueue(queue); + state.setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -216,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -239,7 +237,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -252,12 +250,12 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - getSession().getQueue(queue)->purge(); + state.getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = getSession().getQueue(queue); + Queue::shared_ptr q = state.getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -279,8 +277,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -289,8 +287,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!consumerTag.empty() && getSession().exists(consumerTag)){ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!consumerTag.empty() && state.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -298,7 +296,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); + state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); if(!nowait) getProxy().getBasic().consumeOk(newTag); @@ -308,13 +306,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - getSession().cancel(consumerTag); + state.cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!getSession().get(token, queue, !noAck)){ + if(!state.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack getProxy().getBasic().getEmpty(clusterId); @@ -323,9 +321,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - getSession().ackCumulative(deliveryTag); + state.ackCumulative(deliveryTag); } else { - getSession().ackRange(deliveryTag, deliveryTag); + state.ackRange(deliveryTag, deliveryTag); } } @@ -333,23 +331,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - getSession().startTx(); + state.startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - getSession().commit(&getBroker().getStore()); + state.commit(&getBroker().getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - getSession().rollback(); - getSession().recover(false); + state.rollback(); + state.recover(false); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index ec6b4aa0fc..5537dc67f5 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -20,7 +20,7 @@ */ #include "DtxHandlerImpl.h" #include "MessageHandlerImpl.h" -#include "NameGenerator.h" + #include "qpid/Exception.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/reply_exceptions.h" @@ -44,6 +44,7 @@ class StreamHandler; class DtxHandler; class TunnelHandler; class MessageHandlerImpl; +class Exchange; /** * Per-channel protocol adapter. @@ -54,16 +55,10 @@ class MessageHandlerImpl; * peer. * */ - -// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way -// to group methods as seen by the BADHANDLERs below. -// Handlers should be grouped by layer, the BrokerAdapter stuff -// belongs on the SemanticHandler. -// class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Session& session); + BrokerAdapter(SemanticState& session); BasicHandler* getBasicHandler() { return &basicHandler; } ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } @@ -73,7 +68,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations MessageHandler* getMessageHandler() { return &messageHandler; } DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); } + + framing::ProtocolVersion getVersion() const { return session.getVersion();} AccessHandler* getAccessHandler() { @@ -99,7 +95,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {} + ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& exchange, const std::string& type, @@ -108,10 +104,13 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused); - framing::ExchangeQueryResult query(u_int16_t ticket, const string& name); + framing::ExchangeQueryResult query(u_int16_t ticket, + const std::string& name); private: - void checkType(Exchange::shared_ptr exchange, const std::string& type); - void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate); + void checkType(shared_ptr exchange, const std::string& type); + + void checkAlternate(shared_ptr exchange, + shared_ptr alternate); }; class BindingHandlerImpl : @@ -119,7 +118,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - BindingHandlerImpl(Session& session) : HandlerImpl(session) {} + BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {} framing::BindingQueryResult query(u_int16_t ticket, const std::string& exchange, @@ -133,7 +132,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - QueueHandlerImpl(Session& session) : HandlerImpl(session) {} + QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& queue, const std::string& alternateExchange, @@ -148,7 +147,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - framing::QueueQueryResult query(const string& queue); + framing::QueueQueryResult query(const std::string& queue); void purge(uint16_t ticket, const std::string& queue); void delete_(uint16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty); @@ -159,9 +158,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { NameGenerator tagGenerator; - public: - BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {} + BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {} void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global); @@ -181,7 +179,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - TxHandlerImpl(Session& session) : HandlerImpl(session) {} + TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void select(); void commit(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ec6fd6ece7..b1b8abe4fd 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -23,7 +23,7 @@ #include #include "Connection.h" -#include "Session.h" +#include "SessionState.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "BrokerAdapter.h" #include "SemanticHandler.h" @@ -52,8 +52,7 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { - SessionHandler sa = getChannel(frame.getChannel()); - sa.in(frame); + getChannel(frame.getChannel()).in(frame); } } @@ -94,7 +93,7 @@ void Connection::closeChannel(uint16_t id) { if (i != channels.end()) channels.erase(i); } -SessionHandler Connection::getChannel(ChannelId id) { +SessionHandler& Connection::getChannel(ChannelId id) { boost::optional& ch = channels[id]; if (!ch) { ch = boost::in_place(boost::ref(*this), id); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 2723ac9acc..4f64873dc3 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -35,7 +35,6 @@ #include "qpid/framing/ProtocolVersion.h" #include "Broker.h" #include "qpid/Exception.h" -#include "Session.h" #include "ConnectionHandler.h" #include "SessionHandler.h" @@ -51,7 +50,7 @@ class Connection : public sys::ConnectionInputHandler, Connection(sys::ConnectionOutputHandler* out, Broker& broker); /** Get the SessionHandler for channel. Create if it does not already exist */ - SessionHandler getChannel(framing::ChannelId channel); + SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9196fa71a0..36e6c22f88 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -20,7 +20,7 @@ */ #include "DeliveryRecord.h" #include "DeliverableMessage.h" -#include "Session.h" +#include "SemanticState.h" #include "BrokerExchange.h" #include "qpid/log/Statement.h" @@ -74,7 +74,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const return range->covers(id); } -void DeliveryRecord::redeliver(Session* const session) const{ +void DeliveryRecord::redeliver(SemanticState* const session) const{ if (!confirmed) { if(pull){ //if message was originally sent as response to get, we must requeue it diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 4d98b0c5da..3c833fcaa8 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -34,7 +34,7 @@ namespace qpid { namespace broker { -class Session; +class SemanticState; /** * Record of a delivery for which an ack is outstanding. @@ -61,7 +61,7 @@ class DeliveryRecord{ void requeue() const; void release(); void reject(); - void redeliver(Session* const) const; + void redeliver(SemanticState* const) const; void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; void subtractFrom(Prefetch&) const; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 7ed42d285b..5887d13f85 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -19,21 +19,20 @@ #include #include "Broker.h" -#include "Session.h" #include "qpid/framing/constants.h" using namespace qpid::broker; using namespace qpid::framing; using std::string; -DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {} +DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {} // DtxDemarcationHandler: void DtxHandlerImpl::select() { - getSession().selectDtx(); + state.selectDtx(); } DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -43,7 +42,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, { try { if (fail) { - getSession().endDtx(xid, true); + state.endDtx(xid, true); if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { @@ -51,9 +50,9 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, } } else { if (suspend) { - getSession().suspendDtx(xid); + state.suspendDtx(xid); } else { - getSession().endDtx(xid, false); + state.endDtx(xid, false); } return DtxDemarcationEndResult(XA_OK); } @@ -72,9 +71,9 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, } try { if (resume) { - getSession().resumeDtx(xid); + state.resumeDtx(xid); } else { - getSession().startDtx(xid, getBroker().getDtxManager(), join); + state.startDtx(xid, getBroker().getDtxManager(), join); } return DtxDemarcationStartResult(XA_OK); } catch (const DtxTimeoutException& e) { diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index 7f8eaac335..5bc9d5142a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -32,7 +32,7 @@ class DtxHandlerImpl public framing::AMQP_ServerOperations::DtxDemarcationHandler { public: - DtxHandlerImpl(Session&); + DtxHandlerImpl(SemanticState&); // DtxCoordinationHandler: @@ -57,8 +57,6 @@ public: void select(); framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); - - }; diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index c06188d3c0..0250805f52 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -19,9 +19,8 @@ * */ -#include "Session.h" -#include "SessionHandler.h" -#include "Connection.h" +#include "SemanticState.h" +#include "SessionState.h" namespace qpid { namespace broker { @@ -34,26 +33,14 @@ class Broker; */ class HandlerImpl { protected: - HandlerImpl(Session& s) : session(s) {} + SemanticState& state; + SessionState& session; - Session& getSession() { return session; } - const Session& getSession() const { return session; } - - SessionHandler* getSessionHandler() { return session.getHandler(); } - const SessionHandler* getSessionHandler() const { return session.getHandler(); } + HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} - // Remaining functions may only be called if getSessionHandler() != 0 - framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); } - const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); } - - Connection& getConnection() { return getSessionHandler()->getConnection(); } - const Connection& getConnection() const { return getSessionHandler()->getConnection(); } - - Broker& getBroker() { return getConnection().broker; } - const Broker& getBroker() const { return getConnection().broker; } - - private: - Session& session; + framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } + Connection& getConnection() { return session.getConnection(); } + Broker& getBroker() { return session.getBroker(); } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index a31ac78aa4..3d197e185d 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -18,7 +18,6 @@ #include "qpid/QpidError.h" #include "MessageHandlerImpl.h" -#include "Session.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" @@ -36,8 +35,7 @@ namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Session& session) - : HandlerImpl(session) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} // // Message class method handlers @@ -46,7 +44,7 @@ MessageHandlerImpl::MessageHandlerImpl(Session& session) void MessageHandlerImpl::cancel(const string& destination ) { - getSession().cancel(destination); + state.cancel(destination); } void @@ -97,14 +95,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!destination.empty() && getSession().exists(destination)) + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; //NB: am assuming pre-acquired = 0 as discussed on SIG list as is //the previously expected behaviour - getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -117,9 +115,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); - if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -148,14 +146,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void @@ -166,7 +164,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().reject(i->getValue(), (++i)->getValue()); + state.reject(i->getValue(), (++i)->getValue()); } } @@ -175,10 +173,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - getSession().addMessageCredit(destination, value); + state.addMessageCredit(destination, value); } else if (unit == 1) { //bytes - getSession().addByteCredit(destination, value); + state.addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -190,10 +188,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - getSession().setCreditMode(destination); + state.setCreditMode(destination); } else if (mode == 1) { //window - getSession().setWindowMode(destination); + state.setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -201,12 +199,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - getSession().flush(destination); + state.flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - getSession().stop(destination); + state.stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) @@ -218,7 +216,7 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().acquire(i->getValue(), (++i)->getValue(), results); + state.acquire(i->getValue(), (++i)->getValue(), results); } results = results.condense(); @@ -232,7 +230,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers) } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().release(i->getValue(), (++i)->getValue()); + state.release(i->getValue(), (++i)->getValue()); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index e4d66428d1..d90159d4f7 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -37,7 +37,7 @@ class MessageHandlerImpl : public HandlerImpl { public: - MessageHandlerImpl(Session&); + MessageHandlerImpl(SemanticState&); void append(const std::string& reference, const std::string& bytes); @@ -87,8 +87,8 @@ class MessageHandlerImpl : void release(const framing::SequenceNumberSet& transfers); void subscribe(u_int16_t ticket, - const string& queue, - const string& destination, + const std::string& queue, + const std::string& destination, bool noLocal, u_int8_t confirmMode, u_int8_t acquireMode, diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f8d76c3b5f..0bb813ebfd 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,12 +20,12 @@ */ #include "SemanticHandler.h" -#include "Session.h" +#include "SemanticState.h" #include "SessionHandler.h" +#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "Connection.h" -#include "Session.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" #include "qpid/framing/InvocationVisitor.h" @@ -36,7 +36,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {} +SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -79,13 +79,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - getSession().ackCumulative(mark.getValue()); + state.ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -95,9 +95,9 @@ void SemanticHandler::sendCompletion() SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - assert(getSessionHandler()); - getProxy().getExecution().complete(mark.getValue(), range); + session.getProxy().getExecution().complete(mark.getValue(), range); } + void SemanticHandler::flush() { incoming.flush(); @@ -122,7 +122,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { SequenceNumber id = incoming.next(); - BrokerAdapter adapter(getSession()); + BrokerAdapter adapter(state); InvocationVisitor v(&adapter); method->accept(v); incoming.complete(id); @@ -130,7 +130,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - getProxy().getExecution().result(id.getValue(), v.getResult()); + session.getProxy().getExecution().result(id.getValue(), v.getResult()); } //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -152,8 +152,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&getConnection()); - getSession().handle(msg); + msg->setPublisher(&session.getConnection()); + state.handle(msg); msgBuilder.end(); incoming.track(msg); //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } @@ -163,13 +163,17 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax()); + MessageDelivery::deliver( + msg, session.getHandler().out, + ++outgoing.hwm, token, + session.getConnection().getFrameMax()); return outgoing.hwm; } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax()); + MessageDelivery::deliver(msg, session.getHandler().out, tag, token, + session.getConnection().getFrameMax()); } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 4b3a05ba19..d6dbf878c9 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -44,13 +44,17 @@ class AMQHeaderBody; namespace broker { -class Session; +class SessionState; class SemanticHandler : public DeliveryAdapter, - public framing::FrameHandler, - public framing::AMQP_ServerOperations::ExecutionHandler, - private HandlerImpl + public framing::FrameHandler, + public framing::AMQP_ServerOperations::ExecutionHandler + { + SemanticState state; + SessionState& session; + // FIXME aconway 2007-09-20: Why are these on the handler rather than the + // state? IncomingExecutionContext incoming; framing::Window outgoing; sys::Mutex outLock; @@ -69,8 +73,12 @@ class SemanticHandler : public DeliveryAdapter, DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); + framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } + Connection& getConnection() { return session.getConnection(); } + Broker& getBroker() { return session.getBroker(); } + public: - SemanticHandler(Session& session); + SemanticHandler(SessionState& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp new file mode 100644 index 0000000000..059f99077c --- /dev/null +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -0,0 +1,591 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SessionState.h" +#include "BrokerAdapter.h" +#include "BrokerQueue.h" +#include "Connection.h" +#include "DeliverableMessage.h" +#include "DtxAck.h" +#include "DtxTimeout.h" +#include "Message.h" +#include "SemanticHandler.h" +#include "SessionHandler.h" +#include "TxAck.h" +#include "TxPublish.h" +#include "qpid/QpidError.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" + +#include +#include + +#include +#include +#include +#include + +#include + + +namespace qpid { +namespace broker { + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) + : session(ss), + deliveryAdapter(da), + prefetchSize(0), + prefetchCount(0), + tagGenerator("sgen"), + dtxSelected(false), + accumulatedAck(0), + flowActive(true) +{ + outstanding.reset(); +} + +SemanticState::~SemanticState() { + consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); +} + +bool SemanticState::exists(const string& consumerTag){ + return consumers.find(consumerTag) != consumers.end(); +} + +void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, + Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, + bool exclusive, const FieldTable*) +{ + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); +} + +void SemanticState::cancel(const string& tag){ + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); +} + + +void SemanticState::startTx() +{ + txBuffer = TxBuffer::shared_ptr(new TxBuffer()); +} + +void SemanticState::commit(MessageStore* const store) +{ + if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + txBuffer->enlist(txAck); + if (txBuffer->commitLocal(store)) { + accumulatedAck.clear(); + } +} + +void SemanticState::rollback() +{ + if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + + txBuffer->rollback(); + accumulatedAck.clear(); +} + +void SemanticState::selectDtx() +{ + dtxSelected = true; +} + +void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ + if (!dtxSelected) { + throw ConnectionException(503, "Session has not been selected for use with dtx"); + } + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); + txBuffer = static_pointer_cast(dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } +} + +void SemanticState::endDtx(const std::string& xid, bool fail) +{ + if (!dtxBuffer) { + throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); + } + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") + % dtxBuffer->getXid() % xid); + } + + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + if (fail) { + dtxBuffer->fail(); + } else { + dtxBuffer->markEnded(); + } + dtxBuffer.reset(); +} + +void SemanticState::suspendDtx(const std::string& xid) +{ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") + % dtxBuffer->getXid() % xid); + } + txBuffer.reset();//ops on this session no longer transactional + + checkDtxTimeout(); + dtxBuffer->setSuspended(true); +} + +void SemanticState::resumeDtx(const std::string& xid) +{ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") + % dtxBuffer->getXid() % xid); + } + if (!dtxBuffer->isSuspended()) { + throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + } + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); + txBuffer = static_pointer_cast(dtxBuffer); +} + +void SemanticState::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + +void SemanticState::record(const DeliveryRecord& delivery) +{ + unacked.push_back(delivery); + delivery.addTo(outstanding); +} + +bool SemanticState::checkPrefetch(Message::shared_ptr& msg) +{ + Mutex::ScopedLock locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); + return countOk && sizeOk; +} + +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + DeliveryToken::shared_ptr _token, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _nolocal, + bool _acquire + ) : + Consumer(_acquire), + parent(_parent), + token(_token), + name(_name), + queue(_queue), + ackExpected(ack), + nolocal(_nolocal), + acquire(_acquire), + blocked(false), + windowing(true), + msgCredit(0), + byteCredit(0) {} + +bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) +{ + if (nolocal && + &parent->getSession().getConnection() == msg.payload->getPublisher()) { + return false; + } else { + if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { + blocked = true; + } else { + blocked = false; + + Mutex::ScopedLock locker(parent->deliveryLock); + + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg.payload, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); + } + } + return !blocked; + } +} + +bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) +{ + Mutex::ScopedLock l(lock); + if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { + return false; + } else { + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); + } + return true; + } +} + +void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + parent->deliveryAdapter.redeliver(msg, token, deliveryTag); +} + +SemanticState::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + +void SemanticState::ConsumerImpl::cancel() +{ + if(queue) { + queue->cancel(this); + if (queue->canAutoDelete()) { + parent->getSession().getBroker().getQueues().destroyIf( + queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + } + } +} + +void SemanticState::ConsumerImpl::requestDispatch() +{ + if(blocked) + queue->requestDispatch(this); +} + +void SemanticState::handle(Message::shared_ptr msg) { + if (txBuffer.get()) { + TxPublish* deliverable(new TxPublish(msg)); + TxOp::shared_ptr op(deliverable); + route(msg, *deliverable); + txBuffer->enlist(op); + } else { + DeliverableMessage deliverable(msg); + route(msg, deliverable); + } +} + +void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) { + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName){ + cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); + } + + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + + if (!strategy.delivered) { + //TODO:if reject-unroutable, then reject + //else route to alternate exchange + if (cacheExchange->getAlternate()) { + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); + } + } + +} + +void SemanticState::ackCumulative(DeliveryId id) +{ + ack(id, id, true); +} + +void SemanticState::ackRange(DeliveryId first, DeliveryId last) +{ + ack(first, last, false); +} + +void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; + } + + for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } + + //if the prefetch limit had previously been reached, or credit + //had expired in windowing mode there may be messages that can + //be now be delivered + for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); +} + +void SemanticState::acknowledged(const DeliveryRecord& delivery) +{ + delivery.subtractFrom(outstanding); + ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); + if (i != consumers.end()) { + i->acknowledged(delivery); + } +} + +void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +{ + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + } +} + +void SemanticState::recover(bool requeue) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + if(requeue){ + outstanding.reset(); + //take copy and clear unacked as requeue may result in redelivery to this session + //which will in turn result in additions to unacked + std::list copy = unacked; + unacked.clear(); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); + }else{ + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); + } +} + +bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +{ + QueuedMessage msg = queue->dequeue(); + if(msg.payload){ + Mutex::ScopedLock locker(deliveryLock); + DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token); + if(ackExpected){ + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + +void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag, + DeliveryId deliveryTag) +{ + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); + } +} + +void SemanticState::flow(bool active) +{ + Mutex::ScopedLock locker(deliveryLock); + bool requestDelivery(!flowActive && active); + flowActive = active; + if (requestDelivery) { + //there may be messages that can be now be delivered + std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); + } +} + + +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +{ + ConsumerImplMap::iterator i = consumers.find(destination); + if (i == consumers.end()) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + } else { + return *i; + } +} + +void SemanticState::setWindowMode(const std::string& destination) +{ + find(destination).setWindowMode(); +} + +void SemanticState::setCreditMode(const std::string& destination) +{ + find(destination).setCreditMode(); +} + +void SemanticState::addByteCredit(const std::string& destination, uint32_t value) +{ + find(destination).addByteCredit(value); +} + + +void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) +{ + find(destination).addMessageCredit(value); +} + +void SemanticState::flush(const std::string& destination) +{ + ConsumerImpl& c = find(destination); + c.flush(); +} + + +void SemanticState::stop(const std::string& destination) +{ + find(destination).stop(); +} + +void SemanticState::ConsumerImpl::setWindowMode() +{ + windowing = true; +} + +void SemanticState::ConsumerImpl::setCreditMode() +{ + windowing = false; +} + +void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +{ + byteCredit += value; + requestDispatch(); +} + +void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +{ + msgCredit += value; + requestDispatch(); +} + +void SemanticState::ConsumerImpl::flush() +{ + //need to prevent delivery after requestDispatch returns but + //before credit is reduced to zero; TODO: come up with better + //implementation of flush. + Mutex::ScopedLock l(lock); + queue->requestDispatch(this, true); + byteCredit = 0; + msgCredit = 0; +} + +void SemanticState::ConsumerImpl::stop() +{ + msgCredit = 0; + byteCredit = 0; +} + +Queue::shared_ptr SemanticState::getQueue(const string& name) const { + //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10 + Queue::shared_ptr queue; + if (name.empty()) { + queue = getDefaultQueue(); + if (!queue) + throw NotAllowedException(QPID_MSG("No queue name specified.")); + } + else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<& acquired) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, AcquireFunctor(acquired)); +} + +void SemanticState::release(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); +} + +void SemanticState::reject(DeliveryId first, DeliveryId last) +{ + Mutex::ScopedLock locker(deliveryLock); + AckRange range = findRange(first, last); + for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); + //need to remove the delivery records as well + unacked.erase(range.start, range.end); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h new file mode 100644 index 0000000000..6147380714 --- /dev/null +++ b/cpp/src/qpid/broker/SemanticState.h @@ -0,0 +1,184 @@ +#ifndef QPID_BROKER_SEMANTICSTATE_H +#define QPID_BROKER_SEMANTICSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Consumer.h" +#include "Deliverable.h" +#include "DeliveryAdapter.h" +#include "DeliveryRecord.h" +#include "DeliveryToken.h" +#include "DtxBuffer.h" +#include "DtxManager.h" +#include "NameGenerator.h" +#include "Prefetch.h" +#include "TxBuffer.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/Uuid.h" +#include "qpid/shared_ptr.h" + +#include + +#include +#include + +namespace qpid { +namespace broker { + +class SessionState; + +/** + * SemanticState holds the L3 and L4 state of an open session, whether + * attached to a channel or suspended. + */ +class SemanticState : public framing::FrameHandler::Chains, + private boost::noncopyable +{ + class ConsumerImpl : public Consumer + { + sys::Mutex lock; + SemanticState* const parent; + const DeliveryToken::shared_ptr token; + const string name; + const Queue::shared_ptr queue; + const bool ackExpected; + const bool nolocal; + const bool acquire; + bool blocked; + bool windowing; + uint32_t msgCredit; + uint32_t byteCredit; + + bool checkCredit(Message::shared_ptr& msg); + + public: + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, + const string& name, Queue::shared_ptr queue, + bool ack, bool nolocal, bool acquire); + ~ConsumerImpl(); + bool deliver(QueuedMessage& msg); + void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); + void cancel(); + void requestDispatch(); + + void setWindowMode(); + void setCreditMode(); + void addByteCredit(uint32_t value); + void addMessageCredit(uint32_t value); + void flush(); + void stop(); + void acknowledged(const DeliveryRecord&); + }; + + typedef boost::ptr_map ConsumerImplMap; + + SessionState& session; + DeliveryAdapter& deliveryAdapter; + Queue::shared_ptr defaultQueue; + ConsumerImplMap consumers; + uint32_t prefetchSize; + uint16_t prefetchCount; + Prefetch outstanding; + NameGenerator tagGenerator; + std::list unacked; + sys::Mutex deliveryLock; + TxBuffer::shared_ptr txBuffer; + DtxBuffer::shared_ptr dtxBuffer; + bool dtxSelected; + framing::AccumulatedAck accumulatedAck; + bool flowActive; + + boost::shared_ptr cacheExchange; + + void route(Message::shared_ptr msg, Deliverable& strategy); + void record(const DeliveryRecord& delivery); + bool checkPrefetch(Message::shared_ptr& msg); + void checkDtxTimeout(); + ConsumerImpl& find(const std::string& destination); + void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); + void acknowledged(const DeliveryRecord&); + AckRange findRange(DeliveryId first, DeliveryId last); + + public: + SemanticState(DeliveryAdapter&, SessionState&); + ~SemanticState(); + + SessionState& getSession() { return session; } + + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for session if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if name="" and session has no default. + */ + Queue::shared_ptr getQueue(const std::string& name) const; + + + void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } + Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } + uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; } + uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; } + + bool exists(const string& consumerTag); + + /** + *@param tagInOut - if empty it is updated with the generated token. + */ + void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, + bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0); + + void cancel(const string& tag); + + void setWindowMode(const std::string& destination); + void setCreditMode(const std::string& destination); + void addByteCredit(const std::string& destination, uint32_t value); + void addMessageCredit(const std::string& destination, uint32_t value); + void flush(const std::string& destination); + void stop(const std::string& destination); + + bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); + void startTx(); + void commit(MessageStore* const store); + void rollback(); + void selectDtx(); + void startDtx(const std::string& xid, DtxManager& mgr, bool join); + void endDtx(const std::string& xid, bool fail); + void suspendDtx(const std::string& xid); + void resumeDtx(const std::string& xid); + void ackCumulative(DeliveryId deliveryTag); + void ackRange(DeliveryId deliveryTag, DeliveryId endTag); + void recover(bool requeue); + void flow(bool active); + void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); + void acquire(DeliveryId first, DeliveryId last, std::vector& acquired); + void release(DeliveryId first, DeliveryId last); + void reject(DeliveryId first, DeliveryId last); + void handle(Message::shared_ptr msg); +}; + +}} // namespace qpid::broker + + + + +#endif /*!QPID_BROKER_SEMANTICSTATE_H*/ diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp deleted file mode 100644 index d379b40d3f..0000000000 --- a/cpp/src/qpid/broker/Session.cpp +++ /dev/null @@ -1,608 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Session.h" - -#include "BrokerAdapter.h" -#include "BrokerQueue.h" -#include "Connection.h" -#include "DeliverableMessage.h" -#include "DtxAck.h" -#include "DtxTimeout.h" -#include "Message.h" -#include "SemanticHandler.h" -#include "SessionHandler.h" -#include "TxAck.h" -#include "TxPublish.h" -#include "qpid/QpidError.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" - -#include -#include - -#include -#include -#include -#include - -#include - - -namespace qpid { -namespace broker { - -using std::mem_fun_ref; -using std::bind2nd; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -Session::Session(SessionHandler& a, uint32_t t) - : adapter(&a), - broker(adapter->getConnection().broker), - timeout(t), - id(true), - prefetchSize(0), - prefetchCount(0), - tagGenerator("sgen"), - dtxSelected(false), - accumulatedAck(0), - flowActive(true) -{ - outstanding.reset(); - std::auto_ptr semantic(new SemanticHandler(*this)); - // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState. - deliveryAdapter=semantic.get(); - handlers.push_back(semantic.release()); - in = &handlers[0]; - out = &adapter->out; - // FIXME aconway 2007-08-31: handlerupdater->sessionupdater, - // create a SessionManager in the broker for all session related - // stuff: suspended sessions, handler updaters etc. - // FIXME aconway 2007-08-31: Shouldn't be passing channel ID - broker.update(a.getChannel(), *this); -} - -Session::~Session() { - close(); -} - -bool Session::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, - bool exclusive, const FieldTable*) -{ - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - std::auto_ptr c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal, acquire)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); -} - -void Session::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. - ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - consumers.erase(i); -} - -void Session::close() -{ - opened = false; - consumers.clear(); - if (dtxBuffer.get()) { - dtxBuffer->fail(); - } - recover(true); -} - -void Session::startTx() -{ - txBuffer = TxBuffer::shared_ptr(new TxBuffer()); -} - -void Session::commit(MessageStore* const store) -{ - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); - - TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); - txBuffer->enlist(txAck); - if (txBuffer->commitLocal(store)) { - accumulatedAck.clear(); - } -} - -void Session::rollback() -{ - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); - - txBuffer->rollback(); - accumulatedAck.clear(); -} - -void Session::selectDtx() -{ - dtxSelected = true; -} - -void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join) -{ - if (!dtxSelected) { - throw ConnectionException(503, "Session has not been selected for use with dtx"); - } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = static_pointer_cast(dtxBuffer); - if (join) { - mgr.join(xid, dtxBuffer); - } else { - mgr.start(xid, dtxBuffer); - } -} - -void Session::endDtx(const std::string& xid, bool fail) -{ - if (!dtxBuffer) { - throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); - } - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") - % dtxBuffer->getXid() % xid); - } - - txBuffer.reset();//ops on this session no longer transactional - - checkDtxTimeout(); - if (fail) { - dtxBuffer->fail(); - } else { - dtxBuffer->markEnded(); - } - dtxBuffer.reset(); -} - -void Session::suspendDtx(const std::string& xid) -{ - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") - % dtxBuffer->getXid() % xid); - } - txBuffer.reset();//ops on this session no longer transactional - - checkDtxTimeout(); - dtxBuffer->setSuspended(true); -} - -void Session::resumeDtx(const std::string& xid) -{ - if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") - % dtxBuffer->getXid() % xid); - } - if (!dtxBuffer->isSuspended()) { - throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); - } - - checkDtxTimeout(); - dtxBuffer->setSuspended(false); - txBuffer = static_pointer_cast(dtxBuffer); -} - -void Session::checkDtxTimeout() -{ - if (dtxBuffer->isExpired()) { - dtxBuffer.reset(); - throw DtxTimeoutException(); - } -} - -void Session::record(const DeliveryRecord& delivery) -{ - unacked.push_back(delivery); - delivery.addTo(outstanding); -} - -bool Session::checkPrefetch(Message::shared_ptr& msg) -{ - Mutex::ScopedLock locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacked.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); - return countOk && sizeOk; -} - -Session::ConsumerImpl::ConsumerImpl(Session* _parent, - DeliveryToken::shared_ptr _token, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _nolocal, - bool _acquire - ) : - Consumer(_acquire), - parent(_parent), - token(_token), - name(_name), - queue(_queue), - ackExpected(ack), - nolocal(_nolocal), - acquire(_acquire), - blocked(false), - windowing(true), - msgCredit(0), - byteCredit(0) {} - -bool Session::ConsumerImpl::deliver(QueuedMessage& msg) -{ - if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) { - return false; - } else { - if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { - blocked = true; - } else { - blocked = false; - - Mutex::ScopedLock locker(parent->deliveryLock); - - DeliveryId deliveryTag = - parent->deliveryAdapter->deliver(msg.payload, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); - } - } - return !blocked; - } -} - -bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg) -{ - Mutex::ScopedLock l(lock); - if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - return false; - } else { - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } - return true; - } -} - -void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { - Mutex::ScopedLock locker(parent->deliveryLock); - parent->deliveryAdapter->redeliver(msg, token, deliveryTag); -} - -Session::ConsumerImpl::~ConsumerImpl() { - cancel(); -} - -void Session::ConsumerImpl::cancel() -{ - if(queue) { - queue->cancel(this); - if (queue->canAutoDelete()) { - parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); - } - } -} - -void Session::ConsumerImpl::requestDispatch() -{ - if(blocked) - queue->requestDispatch(this); -} - -void Session::handle(Message::shared_ptr msg) { - if (txBuffer.get()) { - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - route(msg, *deliverable); - txBuffer->enlist(op); - } else { - DeliverableMessage deliverable(msg); - route(msg, deliverable); - } -} - -void Session::route(Message::shared_ptr msg, Deliverable& strategy) { - std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName); - } - - cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); - - if (!strategy.delivered) { - //TODO:if reject-unroutable, then reject - //else route to alternate exchange - if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); - } - } - -} - -void Session::ackCumulative(DeliveryId id) -{ - ack(id, id, true); -} - -void Session::ackRange(DeliveryId first, DeliveryId last) -{ - ack(first, last, false); -} - -void Session::ack(DeliveryId first, DeliveryId last, bool cumulative) -{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); - } else { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&Session::acknowledged, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); - - if (dtxBuffer.get()) { - //if enlisted in a dtx, remove the relevant slice from - //unacked and record it against that transaction - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - } - } else { - for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(start, end); - } - - //if the prefetch limit had previously been reached, or credit - //had expired in windowing mode there may be messages that can - //be now be delivered - for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); -} - -void Session::acknowledged(const DeliveryRecord& delivery) -{ - delivery.subtractFrom(outstanding); - ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); - if (i != consumers.end()) { - i->acknowledged(delivery); - } -} - -void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) -{ - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); - } -} - -void Session::recover(bool requeue) -{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - //take copy and clear unacked as requeue may result in redelivery to this session - //which will in turn result in additions to unacked - std::list copy = unacked; - unacked.clear(); - for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); - } -} - -bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) -{ - QueuedMessage msg = queue->dequeue(); - if(msg.payload){ - Mutex::ScopedLock locker(deliveryLock); - DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -void Session::deliver(Message::shared_ptr& msg, const string& consumerTag, - DeliveryId deliveryTag) -{ - ConsumerImplMap::iterator i = consumers.find(consumerTag); - if (i != consumers.end()){ - i->redeliver(msg, deliveryTag); - } -} - -void Session::flow(bool active) -{ - Mutex::ScopedLock locker(deliveryLock); - bool requestDelivery(!flowActive && active); - flowActive = active; - if (requestDelivery) { - //there may be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); - } -} - - -Session::ConsumerImpl& Session::find(const std::string& destination) -{ - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); - } else { - return *i; - } -} - -void Session::setWindowMode(const std::string& destination) -{ - find(destination).setWindowMode(); -} - -void Session::setCreditMode(const std::string& destination) -{ - find(destination).setCreditMode(); -} - -void Session::addByteCredit(const std::string& destination, uint32_t value) -{ - find(destination).addByteCredit(value); -} - - -void Session::addMessageCredit(const std::string& destination, uint32_t value) -{ - find(destination).addMessageCredit(value); -} - -void Session::flush(const std::string& destination) -{ - ConsumerImpl& c = find(destination); - c.flush(); -} - - -void Session::stop(const std::string& destination) -{ - find(destination).stop(); -} - -void Session::ConsumerImpl::setWindowMode() -{ - windowing = true; -} - -void Session::ConsumerImpl::setCreditMode() -{ - windowing = false; -} - -void Session::ConsumerImpl::addByteCredit(uint32_t value) -{ - byteCredit += value; - requestDispatch(); -} - -void Session::ConsumerImpl::addMessageCredit(uint32_t value) -{ - msgCredit += value; - requestDispatch(); -} - -void Session::ConsumerImpl::flush() -{ - //need to prevent delivery after requestDispatch returns but - //before credit is reduced to zero; TODO: come up with better - //implementation of flush. - Mutex::ScopedLock l(lock); - queue->requestDispatch(this, true); - byteCredit = 0; - msgCredit = 0; -} - -void Session::ConsumerImpl::stop() -{ - msgCredit = 0; - byteCredit = 0; -} - -Queue::shared_ptr Session::getQueue(const string& name) const { - //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10 - Queue::shared_ptr queue; - if (name.empty()) { - queue = getDefaultQueue(); - if (!queue) - throw NotAllowedException(QPID_MSG("No queue name specified.")); - } - else { - queue = getBroker().getQueues().find(name); - if (!queue) - throw NotFoundException(QPID_MSG("Queue not found: "<& acquired) -{ - Mutex::ScopedLock locker(deliveryLock); - AckRange range = findRange(first, last); - for_each(range.start, range.end, AcquireFunctor(acquired)); -} - -void Session::release(DeliveryId first, DeliveryId last) -{ - Mutex::ScopedLock locker(deliveryLock); - AckRange range = findRange(first, last); - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); -} - -void Session::reject(DeliveryId first, DeliveryId last) -{ - Mutex::ScopedLock locker(deliveryLock); - AckRange range = findRange(first, last); - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); - //need to remove the delivery records as well - unacked.erase(range.start, range.end); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h deleted file mode 100644 index 80f1159f04..0000000000 --- a/cpp/src/qpid/broker/Session.h +++ /dev/null @@ -1,201 +0,0 @@ -#ifndef QPID_BROKER_SESSION_H -#define QPID_BROKER_SESSION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Consumer.h" -#include "Deliverable.h" -#include "DeliveryAdapter.h" -#include "DeliveryRecord.h" -#include "DeliveryToken.h" -#include "DtxBuffer.h" -#include "DtxManager.h" -#include "NameGenerator.h" -#include "Prefetch.h" -#include "TxBuffer.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AccumulatedAck.h" -#include "qpid/framing/Uuid.h" -#include "qpid/shared_ptr.h" - -#include - -#include -#include - -namespace qpid { -namespace broker { - -class SessionHandler; -class Broker; - -/** - * Session holds the state of an open session, whether attached to a - * channel or suspended. It also holds the handler chains associated - * with the session. - */ -class Session : public framing::FrameHandler::Chains, - private boost::noncopyable -{ - class ConsumerImpl : public Consumer - { - sys::Mutex lock; - Session* const parent; - const DeliveryToken::shared_ptr token; - const string name; - const Queue::shared_ptr queue; - const bool ackExpected; - const bool nolocal; - const bool acquire; - bool blocked; - bool windowing; - uint32_t msgCredit; - uint32_t byteCredit; - - bool checkCredit(Message::shared_ptr& msg); - - public: - ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, - const string& name, Queue::shared_ptr queue, - bool ack, bool nolocal, bool acquire); - ~ConsumerImpl(); - bool deliver(QueuedMessage& msg); - void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); - void cancel(); - void requestDispatch(); - - void setWindowMode(); - void setCreditMode(); - void addByteCredit(uint32_t value); - void addMessageCredit(uint32_t value); - void flush(); - void stop(); - void acknowledged(const DeliveryRecord&); - }; - - typedef boost::ptr_map ConsumerImplMap; - - SessionHandler* adapter; - Broker& broker; - uint32_t timeout; - framing::Uuid id; - boost::ptr_vector handlers; - - DeliveryAdapter* deliveryAdapter; - Queue::shared_ptr defaultQueue; - ConsumerImplMap consumers; - uint32_t prefetchSize; - uint16_t prefetchCount; - Prefetch outstanding; - NameGenerator tagGenerator; - std::list unacked; - sys::Mutex deliveryLock; - TxBuffer::shared_ptr txBuffer; - DtxBuffer::shared_ptr dtxBuffer; - bool dtxSelected; - framing::AccumulatedAck accumulatedAck; - bool opened; - bool flowActive; - - boost::shared_ptr cacheExchange; - - void route(Message::shared_ptr msg, Deliverable& strategy); - void record(const DeliveryRecord& delivery); - bool checkPrefetch(Message::shared_ptr& msg); - void checkDtxTimeout(); - ConsumerImpl& find(const std::string& destination); - void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void acknowledged(const DeliveryRecord&); - AckRange findRange(DeliveryId first, DeliveryId last); - - public: - Session(SessionHandler&, uint32_t timeout); - ~Session(); - - /** Returns 0 if this session is not currently attached */ - SessionHandler* getHandler() { return adapter; } - const SessionHandler* getHandler() const { return adapter; } - - Broker& getBroker() const { return broker; } - - /** Session timeout, aka detached-lifetime. */ - uint32_t getTimeout() const { return timeout; } - /** Session ID */ - const framing::Uuid& getId() const { return id; } - - /** - * Get named queue, never returns 0. - * @return: named queue or default queue for session if name="" - * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if name="" and session has no default. - */ - Queue::shared_ptr getQueue(const std::string& name) const; - - - void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } - Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } - uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; } - uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; } - - bool exists(const string& consumerTag); - - /** - *@param tagInOut - if empty it is updated with the generated token. - */ - void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, - bool nolocal, bool acks, bool acquire, bool exclusive, const framing::FieldTable* = 0); - - void cancel(const string& tag); - - void setWindowMode(const std::string& destination); - void setCreditMode(const std::string& destination); - void addByteCredit(const std::string& destination, uint32_t value); - void addMessageCredit(const std::string& destination, uint32_t value); - void flush(const std::string& destination); - void stop(const std::string& destination); - - bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); - void close(); - void startTx(); - void commit(MessageStore* const store); - void rollback(); - void selectDtx(); - void startDtx(const std::string& xid, DtxManager& mgr, bool join); - void endDtx(const std::string& xid, bool fail); - void suspendDtx(const std::string& xid); - void resumeDtx(const std::string& xid); - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); - void recover(bool requeue); - void flow(bool active); - void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); - void acquire(DeliveryId first, DeliveryId last, std::vector& acquired); - void release(DeliveryId first, DeliveryId last); - void reject(DeliveryId first, DeliveryId last); - void handle(Message::shared_ptr msg); -}; - -}} // namespace qpid::broker - - - -#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 01ce88059a..13e5c247be 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "SessionHandler.h" -#include "Session.h" +#include "SessionState.h" #include "Connection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -94,7 +94,7 @@ void SessionHandler::assertClosed(const char* method) { void SessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - session.reset(new Session(*this, detachedLifetime)); + session.reset(new SessionState(*this, detachedLifetime)); getProxy().getSession().attached(session->getId(), session->getTimeout()); } diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index a9c0f69985..5ae5b5cfee 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -31,7 +31,7 @@ namespace qpid { namespace broker { class Connection; -class Session; +class SessionState; /** * A SessionHandler is associated with each active channel. It @@ -48,8 +48,8 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, ~SessionHandler(); /** Returns 0 if not attached to a session */ - Session* getSession() { return session.get(); } - const Session* getSession() const { return session.get(); } + SessionState* getSession() { return session.get(); } + const SessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel; } @@ -84,7 +84,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, Connection& connection; const framing::ChannelId channel; framing::AMQP_ClientProxy proxy; - shared_ptr session; + shared_ptr session; bool ignoring; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp new file mode 100644 index 0000000000..acfb3bfea8 --- /dev/null +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionState.h" +#include "SessionHandler.h" +#include "Connection.h" +#include "Broker.h" +#include "SemanticHandler.h" + +namespace qpid { +namespace broker { + +using namespace framing; + +SessionState::SessionState(SessionHandler& h, uint32_t timeout_) + : handler(&h), id(true), timeout(timeout_), + broker(h.getConnection().broker), + version(h.getConnection().getVersion()) +{ + // FIXME aconway 2007-09-21: Break dependnecy - broker updates session. + chain.push_back(new SemanticHandler(*this)); + in = &chain[0]; // Incoming frame to handler chain. + out = &handler->out; // Outgoing frames to SessionHandler + + // FIXME aconway 2007-09-20: use broker to add plugin + // handlers to the chain. + // FIXME aconway 2007-08-31: Shouldn't be passing channel ID. + broker.update(handler->getChannel(), *this); +} + +SessionHandler& SessionState::getHandler() { + assert(isAttached()); + return *handler; +} + +AMQP_ClientProxy& SessionState::getProxy() { + return getHandler().getProxy(); +} + /** Convenience for: getHandler()->getConnection() + *@pre getHandler() != 0 + */ +Connection& SessionState::getConnection() { + return getHandler().getConnection(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 7558ea7866..1334cc7005 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,44 +23,73 @@ */ #include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ProtocolVersion.h" + +#include +#include + namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + namespace broker { +class SessionHandler; +class Broker; +class Connection; + /** * State of a session. + * + * An attached session has a SessionHandler which is attached to a + * connection. A suspended session has no handler. + * + * A SessionState is always associated with an open session (attached or + * suspended) it is destroyed when the session is closed. + * + * The SessionState includes the sessions handler chains, which may + * themselves have state. The handlers will be preserved as long as + * the session is alive. */ -class SessionState +class SessionState : public framing::FrameHandler::Chains, + private boost::noncopyable { public: - enum State { CLOSED, ACTIVE, SUSPENDED }; + /** SessionState for a newly opened connection. */ + SessionState(SessionHandler& h, uint32_t timeout_); - /** Initially in CLOSED state */ - SessionState() : id(false), state(CLOSED), timeout(0) {} + bool isAttached() { return handler; } - /** Make CLOSED session ACTIVE, assigns a new UUID. - * #@param timeout in seconds - */ - void open(u_int32_t timeout_) { - state=ACTIVE; id.generate(); timeout=timeout_; - } + /** @pre isAttached() */ + SessionHandler& getHandler(); - /** Close a session. */ - void close() { state=CLOSED; id.clear(); timeout=0; } + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + Connection& getConnection(); - State getState() const { return state; } const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } - - bool isOpen() { return state == ACTIVE; } - bool isClosed() { return state == CLOSED; } - bool isSuspended() { return state == SUSPENDED; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + private: - friend class SuspendedSessions; + friend class SessionHandler; // Only SessionHandler can attach/detach + void detach() { handler=0; } + void attach(SessionHandler& h) { handler = &h; } + + SessionHandler* handler; framing::Uuid id; - State state; uint32_t timeout; + Broker& broker; + boost::ptr_vector chain; + framing::ProtocolVersion version; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SuspendedSessions.h b/cpp/src/qpid/broker/SuspendedSessions.h index 03c5df27ed..d3a0c17050 100644 --- a/cpp/src/qpid/broker/SuspendedSessions.h +++ b/cpp/src/qpid/broker/SuspendedSessions.h @@ -31,8 +31,10 @@ namespace qpid { namespace broker { -/** Collection of suspended sessions. - * Thread safe. +/** + * Thread safe collection of suspended sessions. + * Every session is owned either by a connection's SessionHandler + * or by the SuspendedSessions. */ class SuspendedSessions { typedef std::multimap Map; -- cgit v1.2.1