diff options
| author | Alan Conway <aconway@apache.org> | 2007-09-18 19:43:29 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-09-18 19:43:29 +0000 |
| commit | 6aeb03f0f5ac7ede957995fc784367a30920c683 (patch) | |
| tree | 7fe35f0ce9fe6bf17dbd6416deb6069ef9c7b07c /cpp/src/qpid | |
| parent | 8b039e1ed4e4340917d7fd3d8202049e691ca6ec (diff) | |
| download | qpid-python-6aeb03f0f5ac7ede957995fc784367a30920c683.tar.gz | |
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
22 files changed, 198 insertions, 307 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 35a87784d2..c266b36dfb 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -38,42 +38,35 @@ typedef std::vector<Queue::shared_ptr> QueueVector; // by the handlers responsible for those classes. // -BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) : - CoreRefs(s, - s.getAdapter()->getConnection(), - s.getAdapter()->getConnection().broker, - a), - basicHandler(*this), - exchangeHandler(*this), - bindingHandler(*this), - messageHandler(*this), - queueHandler(*this), - txHandler(*this), - dtxHandler(*this) +BrokerAdapter::BrokerAdapter(Session& s) : + HandlerImpl(s), + basicHandler(s), + exchangeHandler(s), + bindingHandler(s), + messageHandler(s), + queueHandler(s), + txHandler(s), + dtxHandler(s) {} -ProtocolVersion BrokerAdapter::getVersion() const { - return connection.getVersion(); -} - void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { - alternate = broker.getExchanges().get(alternateExchange); + alternate = getBroker().getExchanges().get(alternateExchange); } if(passive){ - Exchange::shared_ptr actual(broker.getExchanges().get(exchange)); + Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); checkAlternate(actual, alternate); }else{ try{ - std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); + std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { if (durable) { - broker.getStore().create(*response.first); + getBroker().getStore().create(*response.first); } if (alternate) { response.first->setAlternate(alternate); @@ -109,17 +102,17 @@ void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exc void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused - Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); - if (exchange->isDurable()) broker.getStore().destroy(*exchange); + if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - broker.getExchanges().destroy(name); + getBroker().getExchanges().destroy(name); } ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { - Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { return ExchangeQueryResult("", false, true, FieldTable()); @@ -134,12 +127,12 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ { Exchange::shared_ptr exchange; try { - exchange = broker.getExchanges().get(exchangeName); + exchange = getBroker().getExchanges().get(exchangeName); } catch (const ChannelException&) {} Queue::shared_ptr queue; if (!queueName.empty()) { - queue = broker.getQueues().find(queueName); + queue = getBroker().getQueues().find(queueName); } if (!exchange) { @@ -160,7 +153,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = session.getQueue(name); + Queue::shared_ptr queue = getSession().getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -179,22 +172,22 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { - alternate = broker.getExchanges().get(alternateExchange); + alternate = getBroker().getExchanges().get(alternateExchange); } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = session.getQueue(name); + queue = getSession().getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare( + getBroker().getQueues().declare( name, durable, autoDelete && !exclusive, - exclusive ? &connection : 0); + exclusive ? &getConnection() : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - session.setDefaultQueue(queue); + getSession().setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -204,16 +197,16 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue_created.first->create(arguments); //add default binding: - broker.getExchanges().getDefault()->bind(queue, name, 0); - queue->bound(broker.getExchanges().getDefault()->getName(), name, arguments); + getBroker().getExchanges().getDefault()->bind(queue, name, 0); + queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); //handle automatic cleanup: if (exclusive) { - connection.exclusiveQueues.push_back(queue); + getConnection().exclusiveQueues.push_back(queue); } } } - if (exclusive && !queue->isExclusiveOwner(&connection)) + if (exclusive && !queue->isExclusiveOwner(&getConnection())) throw ResourceLockedException( QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); @@ -223,14 +216,14 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = session.getQueue(queueName); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); + Queue::shared_ptr queue = getSession().getQueue(queueName); + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { queue->bound(exchangeName, routingKey, arguments); if (exchange->isDurable() && queue->isDurable()) { - broker.getStore().bind(*exchange, *queue, routingKey, arguments); + getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } } }else{ @@ -246,38 +239,38 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { - broker.getStore().unbind(*exchange, *queue, routingKey, arguments); + getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments); } } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - session.getQueue(queue)->purge(); + getSession().getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = session.getQueue(queue); + Queue::shared_ptr q = getSession().getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ throw PreconditionFailedException("Queue in use."); }else{ //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&connection)){ - QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); - if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); + if(q->isExclusiveOwner(&getConnection())){ + QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); + if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); } q->destroy(); - broker.getQueues().destroy(queue); - q->unbind(broker.getExchanges(), q); + getBroker().getQueues().destroy(queue); + q->unbind(getBroker().getExchanges(), q); } } @@ -286,8 +279,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - session.setPrefetchSize(prefetchSize); - session.setPrefetchCount(prefetchCount); + getSession().setPrefetchSize(prefetchSize); + getSession().setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -296,8 +289,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = session.getQueue(queueName); - if(!consumerTag.empty() && session.exists(consumerTag)){ + Queue::shared_ptr queue = getSession().getQueue(queueName); + if(!consumerTag.empty() && getSession().exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -305,33 +298,34 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); + getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); - if(!nowait) client.consumeOk(newTag); + if(!nowait) + getProxy().getBasic().consumeOk(newTag); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - session.cancel(consumerTag); + getSession().cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!session.get(token, queue, !noAck)){ + if(!getSession().get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId); + getProxy().getBasic().getEmpty(clusterId); } } void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - session.ackCumulative(deliveryTag); + getSession().ackCumulative(deliveryTag); } else { - session.ackRange(deliveryTag, deliveryTag); + getSession().ackRange(deliveryTag, deliveryTag); } } @@ -339,23 +333,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - session.recover(requeue); + getSession().recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - session.startTx(); + getSession().startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - session.commit(&broker.getStore()); + getSession().commit(&getBroker().getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - session.rollback(); - session.recover(false); + getSession().rollback(); + getSession().recover(false); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index cc609be614..ec6b4aa0fc 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -19,7 +19,6 @@ * */ #include "DtxHandlerImpl.h" -#include "HandlerImpl.h" #include "MessageHandlerImpl.h" #include "NameGenerator.h" #include "qpid/Exception.h" @@ -55,18 +54,28 @@ class MessageHandlerImpl; * peer. * */ -class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations + +// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way +// to group methods as seen by the BADHANDLERs below. +// Handlers should be grouped by layer, the BrokerAdapter stuff +// belongs on the SemanticHandler. +// +class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Session& session, framing::ChannelAdapter& a); + BrokerAdapter(Session& session); - framing::ProtocolVersion getVersion() const; BasicHandler* getBasicHandler() { return &basicHandler; } ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } BindingHandler* getBindingHandler() { return &bindingHandler; } QueueHandler* getQueueHandler() { return &queueHandler; } TxHandler* getTxHandler() { return &txHandler; } MessageHandler* getMessageHandler() { return &messageHandler; } + DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } + DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } + framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); } + + AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Access class not implemented"); } FileHandler* getFileHandler() { @@ -75,26 +84,22 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations throw framing::NotImplementedException("Stream class not implemented"); } TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Tunnel class not implemented"); } - DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } - DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); } // Handlers no longer implemented in BrokerAdapter: #define BADHANDLER() assert(0); throw framing::InternalErrorException() + ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } ChannelHandler* getChannelHandler() { BADHANDLER(); } #undef BADHANDLER - framing::AMQP_ClientProxy& getProxy() { return proxy; } - private: class ExchangeHandlerImpl : public ExchangeHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Exchange> + public HandlerImpl { public: - ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& exchange, const std::string& type, @@ -111,10 +116,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations class BindingHandlerImpl : public BindingHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Binding> + public HandlerImpl { public: - BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + BindingHandlerImpl(Session& session) : HandlerImpl(session) {} framing::BindingQueryResult query(u_int16_t ticket, const std::string& exchange, @@ -125,10 +130,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations class QueueHandlerImpl : public QueueHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Queue> + public HandlerImpl { public: - QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + QueueHandlerImpl(Session& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& queue, const std::string& alternateExchange, @@ -151,12 +156,12 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations class BasicHandlerImpl : public BasicHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Basic> + public HandlerImpl { NameGenerator tagGenerator; public: - BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {} + BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {} void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global); @@ -173,10 +178,10 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations class TxHandlerImpl : public TxHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Tx> + public HandlerImpl { public: - TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + TxHandlerImpl(Session& session) : HandlerImpl(session) {} void select(); void commit(); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 3dd6bfbb29..f4e9eec331 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -68,6 +68,7 @@ class Connection : public sys::ConnectionInputHandler, void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setStagingThreshold(uint64_t st) { stagingThreshold = st; } + Broker& getBroker() { return broker; } Broker& broker; std::vector<Queue::shared_ptr> exclusiveQueues; diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 619d59f710..9196fa71a0 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -21,6 +21,7 @@ #include "DeliveryRecord.h" #include "DeliverableMessage.h" #include "Session.h" +#include "BrokerExchange.h" #include "qpid/log/Statement.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 15ece77c6f..7ed42d285b 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -26,14 +26,14 @@ using namespace qpid::broker; using namespace qpid::framing; using std::string; -DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} +DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {} // DtxDemarcationHandler: void DtxHandlerImpl::select() { - session.selectDtx(); + getSession().selectDtx(); } DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -43,7 +43,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, { try { if (fail) { - session.endDtx(xid, true); + getSession().endDtx(xid, true); if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { @@ -51,9 +51,9 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, } } else { if (suspend) { - session.suspendDtx(xid); + getSession().suspendDtx(xid); } else { - session.endDtx(xid, false); + getSession().endDtx(xid, false); } return DtxDemarcationEndResult(XA_OK); } @@ -72,9 +72,9 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, } try { if (resume) { - session.resumeDtx(xid); + getSession().resumeDtx(xid); } else { - session.startDtx(xid, broker.getDtxManager(), join); + getSession().startDtx(xid, getBroker().getDtxManager(), join); } return DtxDemarcationStartResult(XA_OK); } catch (const DtxTimeoutException& e) { @@ -88,7 +88,7 @@ DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, const string& xid) { try { - bool ok = broker.getDtxManager().prepare(xid); + bool ok = getBroker().getDtxManager().prepare(xid); return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { return DtxCoordinationPrepareResult(XA_RBTIMEOUT); @@ -100,7 +100,7 @@ DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, bool onePhase) { try { - bool ok = broker.getDtxManager().commit(xid, onePhase); + bool ok = getBroker().getDtxManager().commit(xid, onePhase); return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { return DtxCoordinationCommitResult(XA_RBTIMEOUT); @@ -112,7 +112,7 @@ DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, const string& xid ) { try { - broker.getDtxManager().rollback(xid); + getBroker().getDtxManager().rollback(xid); return DtxCoordinationRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { return DtxCoordinationRollbackResult(XA_RBTIMEOUT); @@ -136,7 +136,7 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, // note that this restricts the length of the xids more than is // strictly 'legal', but that is ok for testing std::set<std::string> xids; - broker.getStore().collectPreparedXids(xids); + getBroker().getStore().collectPreparedXids(xids); uint size(0); for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { size += i->size() + 1/*shortstr size*/; @@ -167,7 +167,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) { - uint32_t timeout = broker.getDtxManager().getTimeout(xid); + uint32_t timeout = getBroker().getDtxManager().getTimeout(xid); return DtxCoordinationGetTimeoutResult(timeout); } @@ -176,7 +176,7 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, const string& xid, u_int32_t timeout) { - broker.getDtxManager().setTimeout(xid, timeout); + getBroker().getDtxManager().setTimeout(xid, timeout); } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index da6379b26c..7f8eaac335 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -27,12 +27,12 @@ namespace qpid { namespace broker { class DtxHandlerImpl - : public CoreRefs, + : public HandlerImpl, public framing::AMQP_ServerOperations::DtxCoordinationHandler, public framing::AMQP_ServerOperations::DtxDemarcationHandler { public: - DtxHandlerImpl(CoreRefs& parent); + DtxHandlerImpl(Session&); // DtxCoordinationHandler: diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index 6350aa32c5..c06188d3c0 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -19,49 +19,47 @@ * */ -#include "Broker.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/ChannelAdapter.h" +#include "Session.h" +#include "SessionHandler.h" +#include "Connection.h" namespace qpid { namespace broker { -class Connection; -class Session; +class Broker; /** - * A collection of references to the core objects required by an adapter, - * and a client proxy. + * Base template for protocol handler implementations. + * Provides convenience methods for getting common session objects. */ -struct CoreRefs -{ - CoreRefs(Session& ch, Connection& c, Broker& b, framing::ChannelAdapter& a) - : session(ch), connection(c), broker(b), adapter(a), proxy(a.getHandlers().out) {} - - Session& session; - Connection& connection; - Broker& broker; - framing::ChannelAdapter& adapter; - framing::AMQP_ClientProxy proxy; -}; +class HandlerImpl { + protected: + HandlerImpl(Session& s) : session(s) {} + Session& getSession() { return session; } + const Session& getSession() const { return session; } + + SessionHandler* getSessionHandler() { return session.getHandler(); } + const SessionHandler* getSessionHandler() const { return session.getHandler(); } -/** - * Base template for protocol handler implementations. - * Provides the core references and appropriate AMQP class proxy. - */ -template <class ProxyType> -struct HandlerImpl : public CoreRefs { - typedef HandlerImpl<ProxyType> HandlerImplType; - HandlerImpl(CoreRefs& parent) - : CoreRefs(parent), client(ProxyType::get(proxy)) {} - ProxyType client; -}; + // Remaining functions may only be called if getSessionHandler() != 0 + framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); } + const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); } + Connection& getConnection() { return getSessionHandler()->getConnection(); } + const Connection& getConnection() const { return getSessionHandler()->getConnection(); } + + Broker& getBroker() { return getConnection().broker; } + const Broker& getBroker() const { return getConnection().broker; } + private: + Session& session; +}; }} // namespace qpid::broker #endif /*!_broker_HandlerImpl_h*/ + + diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 39f9f85c13..f0cbf027df 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -139,7 +139,7 @@ void Message::releaseContent(MessageStore* _store) frames.remove(TypeFilter(CONTENT_BODY)); } -void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize) +void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) { if (isContentReleased()) { //load content from store in chunks of maxContentSize @@ -148,7 +148,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) { uint64_t remaining = expectedSize - offset; - AMQFrame frame(channel, AMQContentBody()); + AMQFrame frame(0, AMQContentBody()); string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(*this, data, offset, @@ -168,15 +168,14 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t Count c; frames.map_if(c, TypeFilter(CONTENT_BODY)); - SendContent f(out, channel, maxFrameSize, c.getCount()); + SendContent f(out, maxFrameSize, c.getCount()); frames.map_if(f, TypeFilter(CONTENT_BODY)); } } -void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/) +void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { - Relay f(out, channel); - frames.map_if(f, TypeFilter(HEADER_BODY)); + frames.map_if(out, TypeFilter(HEADER_BODY)); } MessageAdapter& Message::getAdapter() const diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 53640c65ad..c1dd2f28fa 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -114,8 +114,8 @@ public: */ void releaseContent(MessageStore* store); - void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); - void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); + void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize); + void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize); bool isContentLoaded() const; diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 6471245ed9..edacd7a1c1 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -23,7 +23,7 @@ #include "DeliveryToken.h" #include "Message.h" #include "BrokerQueue.h" -#include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" #include "qpid/framing/MessageTransferBody.h" @@ -114,7 +114,7 @@ DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::st } void MessageDelivery::deliver(Message::shared_ptr msg, - framing::ChannelAdapter& channel, + framing::FrameHandler& handler, DeliveryId id, DeliveryToken::shared_ptr token, uint16_t framesize) @@ -123,15 +123,10 @@ void MessageDelivery::deliver(Message::shared_ptr msg, //another may well have the wrong headers; however we will only //have one content class for 0-10 proper - FrameHandler& handler = channel.getHandlers().out; - - //send method boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); AMQFrame method = t->sendMethod(msg, id); method.setEof(false); - method.setChannel(channel.getId()); handler.handle(method); - - msg->sendHeader(handler, channel.getId(), framesize); - msg->sendContent(handler, channel.getId(), framesize); + msg->sendHeader(handler, framesize); + msg->sendContent(handler, framesize); } diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h index b87ef2a5ce..3beb268ca7 100644 --- a/cpp/src/qpid/broker/MessageDelivery.h +++ b/cpp/src/qpid/broker/MessageDelivery.h @@ -23,15 +23,9 @@ */ #include <boost/shared_ptr.hpp> #include "DeliveryId.h" +#include "qpid/framing/FrameHandler.h" namespace qpid { - -namespace framing { - -class ChannelAdapter; - -} - namespace broker { class DeliveryToken; @@ -49,7 +43,7 @@ public: u_int8_t confirmMode, u_int8_t acquireMode); - static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel, + static void deliver(boost::shared_ptr<Message> msg, framing::FrameHandler& out, DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize); }; diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index d9b91c1617..a31ac78aa4 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -36,8 +36,8 @@ namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) - : HandlerImplType(parent) {} +MessageHandlerImpl::MessageHandlerImpl(Session& session) + : HandlerImpl(session) {} // // Message class method handlers @@ -46,7 +46,7 @@ MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) void MessageHandlerImpl::cancel(const string& destination ) { - session.cancel(destination); + getSession().cancel(destination); } void @@ -97,14 +97,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = session.getQueue(queueName); - if(!destination.empty() && session.exists(destination)) + Queue::shared_ptr queue = getSession().getQueue(queueName); + if(!destination.empty() && getSession().exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; //NB: am assuming pre-acquired = 0 as discussed on SIG list as is //the previously expected behaviour - session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -117,9 +117,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); - if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -148,14 +148,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - session.setPrefetchSize(prefetchSize); - session.setPrefetchCount(prefetchCount); + getSession().setPrefetchSize(prefetchSize); + getSession().setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - session.recover(requeue); + getSession().recover(requeue); } void @@ -166,7 +166,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.reject(i->getValue(), (++i)->getValue()); + getSession().reject(i->getValue(), (++i)->getValue()); } } @@ -175,10 +175,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - session.addMessageCredit(destination, value); + getSession().addMessageCredit(destination, value); } else if (unit == 1) { //bytes - session.addByteCredit(destination, value); + getSession().addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -190,10 +190,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - session.setCreditMode(destination); + getSession().setCreditMode(destination); } else if (mode == 1) { //window - session.setWindowMode(destination); + getSession().setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -201,12 +201,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - session.flush(destination); + getSession().flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - session.stop(destination); + getSession().stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) @@ -218,11 +218,11 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.acquire(i->getValue(), (++i)->getValue(), results); + getSession().acquire(i->getValue(), (++i)->getValue(), results); } results = results.condense(); - client.acquired(results); + getProxy().getMessage().acquired(results); } void MessageHandlerImpl::release(const SequenceNumberSet& transfers) @@ -232,7 +232,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers) } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - session.release(i->getValue(), (++i)->getValue()); + getSession().release(i->getValue(), (++i)->getValue()); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index 35d34bf94e..e4d66428d1 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -34,10 +34,10 @@ class MessageMessage; class MessageHandlerImpl : public framing::AMQP_ServerOperations::MessageHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Message> + public HandlerImpl { public: - MessageHandlerImpl(CoreRefs& parent); + MessageHandlerImpl(Session&); void append(const std::string& reference, const std::string& bytes); diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 60c6a5cc10..2dd7861e4a 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -37,13 +37,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(Session& s) : - session(s), - connection(s.getAdapter()->getConnection()), - adapter(s, static_cast<ChannelAdapter&>(*this)) -{ - init(s.getAdapter()->getChannel(), s.out, 0); -} +SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -86,24 +80,24 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - session.ackCumulative(mark.getValue()); + getSession().ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } void SemanticHandler::sendCompletion() { - if (isOpen()) { + if (getSessionHandler()) { SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range)); + getProxy().getExecution().complete(mark.getValue(), range); } } void SemanticHandler::flush() @@ -129,7 +123,8 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = incoming.next(); + BrokerAdapter adapter(getSession()); InvocationVisitor v(&adapter); method->accept(v); incoming.complete(id); @@ -137,7 +132,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult())); + getProxy().getExecution().result(id.getValue(), v.getResult()); } //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -159,45 +154,24 @@ void SemanticHandler::handleContent(AMQFrame& frame) } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&connection); - session.handle(msg); + msg->setPublisher(&getConnection()); + getSession().handle(msg); msgBuilder.end(); incoming.track(msg); //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } } } -bool SemanticHandler::isOpen() const { - // FIXME aconway 2007-08-30: remove. - return true; -} - DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); + MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax()); return outgoing.hwm; } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); -} - -void SemanticHandler::send(const AMQBody& body) -{ - Mutex::ScopedLock l(outLock); - // FIXME aconway 2007-08-31: SessionHandler should not send - // channel/session commands via the semantic handler, it should shortcut - // directly to its own output handler. That will make the CLASS_ID - // part of the test unnecessary. - // - if (body.getMethod() && - body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) - { - ++outgoing.hwm; - } - ChannelAdapter::send(body); + MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax()); } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) @@ -225,11 +199,3 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) throw Exception("Could not determine track"); } -//ChannelAdapter virtual methods, no longer used: -void SemanticHandler::handleMethod(framing::AMQMethodBody*){} - -void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} - -void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} - -void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {} diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index f17ef67bfc..4b3a05ba19 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -26,12 +26,12 @@ #include "DeliveryAdapter.h" #include "MessageBuilder.h" #include "IncomingExecutionContext.h" +#include "HandlerImpl.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/ChannelAdapter.h" namespace qpid { @@ -49,11 +49,8 @@ class Session; class SemanticHandler : public DeliveryAdapter, public framing::FrameHandler, public framing::AMQP_ServerOperations::ExecutionHandler, - private framing::ChannelAdapter + private HandlerImpl { - Session& session; - Connection& connection; - BrokerAdapter adapter; IncomingExecutionContext incoming; framing::Window outgoing; sys::Mutex outLock; @@ -68,17 +65,6 @@ class SemanticHandler : public DeliveryAdapter, void sendCompletion(); - //ChannelAdapter virtual methods: - void handleMethod(framing::AMQMethodBody* method); - - bool isOpen() const; - void handleHeader(framing::AMQHeaderBody*); - void handleContent(framing::AMQContentBody*); - void handleHeartbeat(framing::AMQHeartbeatBody*); - - void send(const framing::AMQBody& body); - - //delivery adapter methods: DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); @@ -89,9 +75,6 @@ public: //frame handler: void handle(framing::AMQFrame& frame); - // FIXME aconway 2007-08-31: Move proxy to Session. - framing::AMQP_ClientProxy& getProxy() { return adapter.getProxy(); } - //execution class method handlers: void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); void flush(); diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index 6bc96b0049..c59119140c 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -68,11 +68,9 @@ Session::Session(SessionHandler& a, uint32_t t) flowActive(true) { outstanding.reset(); - // FIXME aconway 2007-08-29: handler to get Session, not connection. std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this)); + // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState. deliveryAdapter=semantic.get(); - // FIXME aconway 2007-08-31: Remove, workaround. - semanticHandler=semantic.get(); handlers.push_back(semantic.release()); in = &handlers[0]; out = &adapter->out; @@ -256,7 +254,7 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent, bool Session::ConsumerImpl::deliver(QueuedMessage& msg) { - if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) { + if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) { return false; } else { if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { @@ -306,7 +304,7 @@ void Session::ConsumerImpl::cancel() if(queue) { queue->cancel(this); if (queue->canAutoDelete()) { - parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(), + parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); } } @@ -333,7 +331,7 @@ void Session::handle(Message::shared_ptr msg) { void Session::route(Message::shared_ptr msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName); + cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index afcd5853fa..eea36ba5fc 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -32,7 +32,6 @@ #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" -#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AccumulatedAck.h" #include "qpid/shared_ptr.h" @@ -43,11 +42,6 @@ #include <vector> namespace qpid { - -namespace framing { -class AMQP_ClientProxy; -} - namespace broker { class SessionHandler; @@ -129,20 +123,15 @@ class Session : public framing::FrameHandler::Chains, ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); - - // FIXME aconway 2007-08-31: remove, temporary hack. - SemanticHandler* semanticHandler; - AckRange findRange(DeliveryId first, DeliveryId last); - public: Session(SessionHandler&, uint32_t timeout); ~Session(); /** Returns 0 if this session is not currently attached */ - SessionHandler* getAdapter() { return adapter; } - const SessionHandler* getAdapter() const { return adapter; } + SessionHandler* getHandler() { return adapter; } + const SessionHandler* getHandler() const { return adapter; } Broker& getBroker() const { return broker; } @@ -198,13 +187,7 @@ class Session : public framing::FrameHandler::Chains, void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired); void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); - void handle(Message::shared_ptr msg); - - framing::AMQP_ClientProxy& getProxy() { - // FIXME aconway 2007-08-31: Move proxy to Session. - return semanticHandler->getProxy(); - } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index e46f11d98c..d4f8c25892 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -29,18 +29,10 @@ namespace qpid { namespace broker { using namespace framing; -// FIXME aconway 2007-08-31: the SessionHandler should create its -// private proxy directly on the connections out handler. -// Session/channel methods should not go thru the other layers. -// Need to get rid of ChannelAdapter and allow proxies to be created -// directly on output handlers. -// -framing::AMQP_ClientProxy& SessionHandler::getProxy() { - return session->getProxy(); -} - SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), ignoring(false), channelHandler(*this) {} + : InOutHandler(0, &c.getOutput()), + connection(c), channel(ch), proxy(out), + ignoring(false), channelHandler(*this) {} SessionHandler::~SessionHandler() {} diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index c7c172b01d..219cd01396 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -24,14 +24,10 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" namespace qpid { - -namespace framing { -class AMQP_ClientProxy; -} - namespace broker { class Connection; @@ -51,12 +47,17 @@ class SessionHandler : public framing::FrameHandler::InOutHandler ~SessionHandler(); /** Returns 0 if not attached to a session */ - Session* getSession() const { return session.get(); } + Session* getSession() { return session.get(); } + const Session* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel; } + Connection& getConnection() { return connection; } const Connection& getConnection() const { return connection; } + framing::AMQP_ClientProxy& getProxy() { return proxy; } + const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -84,10 +85,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler void assertOpen(const char* method); void assertClosed(const char* method); - framing::AMQP_ClientProxy& getProxy(); - Connection& connection; const framing::ChannelId channel; + framing::AMQP_ClientProxy proxy; shared_ptr<Session> session; bool ignoring; ChannelMethods channelHandler; diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp index 573ebca9e2..62a876f7c5 100644 --- a/cpp/src/qpid/framing/SendContent.cpp +++ b/cpp/src/qpid/framing/SendContent.cpp @@ -21,7 +21,7 @@ #include "SendContent.h" -qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c), +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t mfs, uint efc) : handler(h), maxFrameSize(mfs), expectedFrameCount(efc), frameCount(0) {} @@ -45,14 +45,13 @@ void qpid::framing::SendContent::operator()(const AMQFrame& f) } else { AMQFrame copy(f); setFlags(copy, first, last); - copy.setChannel(channel); handler.handle(copy); } } void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const { - AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size))); + AMQFrame fragment(0, AMQContentBody(body.getData().substr(offset, size))); setFlags(fragment, first, last); handler.handle(fragment); } diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h index 05b5838c62..dcd5202b3e 100644 --- a/cpp/src/qpid/framing/SendContent.h +++ b/cpp/src/qpid/framing/SendContent.h @@ -37,7 +37,6 @@ namespace framing { class SendContent { mutable FrameHandler& handler; - const uint16_t channel; const uint16_t maxFrameSize; uint expectedFrameCount; uint frameCount; @@ -45,7 +44,7 @@ class SendContent void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const; void setFlags(AMQFrame& f, bool first, bool last) const; public: - SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount); + SendContent(FrameHandler& _handler, uint16_t _maxFrameSize, uint frameCount); void operator()(const AMQFrame& f); }; diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h index 7b7e24b2b3..ed664d1529 100644 --- a/cpp/src/qpid/framing/frame_functors.h +++ b/cpp/src/qpid/framing/frame_functors.h @@ -82,22 +82,6 @@ public: void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); } }; -class Relay -{ - FrameHandler& handler; - const uint16_t channel; - -public: - Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {} - - void operator()(AMQFrame& f) - { - AMQFrame copy(f); - copy.setChannel(channel); - handler.handle(copy); - } -}; - class Print { std::ostream& out; |
