diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-31 20:51:22 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-31 20:51:22 +0000 |
| commit | 761e10501fe5ea51f9d8c40d9a200ae27193ab23 (patch) | |
| tree | e2d4bdfdc0b9383661947378a1f183387501637c /cpp/src/qpid/broker/BrokerAdapter.cpp | |
| parent | 655b3b5806bafdd784f6a9c242e26341bd6aeccc (diff) | |
| download | qpid-python-761e10501fe5ea51f9d8c40d9a200ae27193ab23.tar.gz | |
* Summary:
- Moved BrokerChannel functionality into Session.
- Moved ChannelHandler methods handling into SessionAdapter.
- Updated all handlers to use session.
(We're still using AMQP channel methods in SessionAdapter)
Roles & responsibilities:
Session:
- represents an _open_ session, may be active or suspended.
- ows all session state including handler chains.
- attahced to SessionAdapter when active, not when suspended.
SessionAdapter:
- reprents the association of a channel with a session.
- owned by Connection, kept in the session map.
- channel open == SessionAdapter.getSessio() != 0
Anything that depends on attachment to a channel, connection or
protocol should be in SessionAdpater. Anything that suvives a
session suspend belongs in Session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 140 |
1 files changed, 56 insertions, 84 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 07b7b4f638..a6e9c007cf 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -15,10 +15,9 @@ * limitations under the License. * */ -#include <boost/format.hpp> - #include "BrokerAdapter.h" -#include "BrokerChannel.h" +#include "Session.h" +#include "SessionAdapter.h" #include "Connection.h" #include "DeliveryToken.h" #include "MessageDelivery.h" @@ -28,18 +27,23 @@ namespace qpid { namespace broker { -using boost::format; using namespace qpid; using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; - - BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b, ChannelAdapter& a) : - CoreRefs(ch, c, b, a), - connection(c), +// FIXME aconway 2007-08-31: now that functionality is distributed +// between different handlers, BrokerAdapter should be dropped. +// Instead the individual class Handler interfaces can be implemented +// by the handlers responsible for those classes. +// + +BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) : + CoreRefs(s, + s.getAdapter()->getConnection(), + s.getAdapter()->getConnection().broker, + a), basicHandler(*this), - channelHandler(*this), exchangeHandler(*this), bindingHandler(*this), messageHandler(*this), @@ -52,31 +56,6 @@ typedef std::vector<Queue::shared_ptr> QueueVector; ProtocolVersion BrokerAdapter::getVersion() const { return connection.getVersion(); } - -void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ - channel.open(); - client.openOk(); -} - -void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ - channel.flow(active); - client.flowOk(active); -} - -void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} - -void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/, - const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(); - // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. - connection.closeChannel(channel.getId()); -} - -void BrokerAdapter::ChannelHandlerImpl::closeOk(){} - - void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, const string& alternateExchange, @@ -148,10 +127,10 @@ ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket } BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, - const std::string& exchangeName, - const std::string& queueName, - const std::string& key, - const framing::FieldTable& args) + const std::string& exchangeName, + const std::string& queueName, + const std::string& key, + const framing::FieldTable& args) { Exchange::shared_ptr exchange; try { @@ -181,7 +160,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = getQueue(name); + Queue::shared_ptr queue = session.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -205,7 +184,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getQueue(name); + queue = session.getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -216,7 +195,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - channel.setDefaultQueue(queue); + session.setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -236,17 +215,16 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } } if (exclusive && !queue->isExclusiveOwner(&connection)) - throw ChannelException( - 405, - format("Cannot grant exclusive access to queue '%s'") - % queue->getName()); + throw ResourceLockedException( + QPID_MSG("Cannot grant exclusive access to queue " + << queue->getName())); } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = getQueue(queueName); + Queue::shared_ptr queue = session.getQueue(queueName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -257,23 +235,23 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu } } }else{ - throw ChannelException( - 404, "Bind failed. No such exchange: " + exchangeName); + throw NotFoundException( + "Bind failed. No such exchange: " + exchangeName); } } void BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, - const string& queueName, - const string& exchangeName, - const string& routingKey, - const qpid::framing::FieldTable& arguments ) + const string& queueName, + const string& exchangeName, + const string& routingKey, + const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = getQueue(queueName); - if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); + Queue::shared_ptr queue = session.getQueue(queueName); + if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); - if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); + if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { broker.getStore().unbind(*exchange, *queue, routingKey, arguments); @@ -282,17 +260,16 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - getQueue(queue)->purge(); + session.getQueue(queue)->purge(); } -void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty){ +void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = getQueue(queue); + Queue::shared_ptr q = session.getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); + throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); + throw PreconditionFailedException("Queue in use."); }else{ //remove the queue from the list of exclusive queues if necessary if(q->isExclusiveOwner(&connection)){ @@ -310,18 +287,18 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - channel.setPrefetchSize(prefetchSize); - channel.setPrefetchCount(prefetchCount); + session.setPrefetchSize(prefetchSize); + session.setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) + const string& queueName, const string& consumerTag, + bool noLocal, bool noAck, bool exclusive, + bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = getQueue(queueName); - if(!consumerTag.empty() && channel.exists(consumerTag)){ + Queue::shared_ptr queue = session.getQueue(queueName); + if(!consumerTag.empty() && session.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -329,7 +306,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); + session.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); if(!nowait) client.consumeOk(newTag); @@ -338,13 +315,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - channel.cancel(consumerTag); + session.cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = getQueue(queueName); + Queue::shared_ptr queue = session.getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!channel.get(token, queue, !noAck)){ + if(!session.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId); @@ -353,9 +330,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - channel.ackCumulative(deliveryTag); + session.ackCumulative(deliveryTag); } else { - channel.ackRange(deliveryTag, deliveryTag); + session.ackRange(deliveryTag, deliveryTag); } } @@ -363,29 +340,24 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - channel.recover(requeue); + session.recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - channel.startTx(); + session.startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - channel.commit(&broker.getStore()); + session.commit(&broker.getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - channel.rollback(); - channel.recover(false); + session.rollback(); + session.recover(false); } -void BrokerAdapter::ChannelHandlerImpl::ok() -{ - //no specific action required, generic response handling should be sufficient -} - }} // namespace qpid::broker |
