diff options
| author | Alan Conway <aconway@apache.org> | 2007-09-18 19:43:29 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-09-18 19:43:29 +0000 |
| commit | 6aeb03f0f5ac7ede957995fc784367a30920c683 (patch) | |
| tree | 7fe35f0ce9fe6bf17dbd6416deb6069ef9c7b07c /cpp/src/qpid/broker/BrokerAdapter.cpp | |
| parent | 8b039e1ed4e4340917d7fd3d8202049e691ca6ec (diff) | |
| download | qpid-python-6aeb03f0f5ac7ede957995fc784367a30920c683.tar.gz | |
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 126 |
1 files changed, 60 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 35a87784d2..c266b36dfb 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -38,42 +38,35 @@ typedef std::vector<Queue::shared_ptr> QueueVector; // by the handlers responsible for those classes. // -BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) : - CoreRefs(s, - s.getAdapter()->getConnection(), - s.getAdapter()->getConnection().broker, - a), - basicHandler(*this), - exchangeHandler(*this), - bindingHandler(*this), - messageHandler(*this), - queueHandler(*this), - txHandler(*this), - dtxHandler(*this) +BrokerAdapter::BrokerAdapter(Session& s) : + HandlerImpl(s), + basicHandler(s), + exchangeHandler(s), + bindingHandler(s), + messageHandler(s), + queueHandler(s), + txHandler(s), + dtxHandler(s) {} -ProtocolVersion BrokerAdapter::getVersion() const { - return connection.getVersion(); -} - void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { - alternate = broker.getExchanges().get(alternateExchange); + alternate = getBroker().getExchanges().get(alternateExchange); } if(passive){ - Exchange::shared_ptr actual(broker.getExchanges().get(exchange)); + Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); checkAlternate(actual, alternate); }else{ try{ - std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); + std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); if (response.second) { if (durable) { - broker.getStore().create(*response.first); + getBroker().getStore().create(*response.first); } if (alternate) { response.first->setAlternate(alternate); @@ -109,17 +102,17 @@ void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exc void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused - Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); - if (exchange->isDurable()) broker.getStore().destroy(*exchange); + if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - broker.getExchanges().destroy(name); + getBroker().getExchanges().destroy(name); } ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { - Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { return ExchangeQueryResult("", false, true, FieldTable()); @@ -134,12 +127,12 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ { Exchange::shared_ptr exchange; try { - exchange = broker.getExchanges().get(exchangeName); + exchange = getBroker().getExchanges().get(exchangeName); } catch (const ChannelException&) {} Queue::shared_ptr queue; if (!queueName.empty()) { - queue = broker.getQueues().find(queueName); + queue = getBroker().getQueues().find(queueName); } if (!exchange) { @@ -160,7 +153,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = session.getQueue(name); + Queue::shared_ptr queue = getSession().getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -179,22 +172,22 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { - alternate = broker.getExchanges().get(alternateExchange); + alternate = getBroker().getExchanges().get(alternateExchange); } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = session.getQueue(name); + queue = getSession().getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare( + getBroker().getQueues().declare( name, durable, autoDelete && !exclusive, - exclusive ? &connection : 0); + exclusive ? &getConnection() : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - session.setDefaultQueue(queue); + getSession().setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -204,16 +197,16 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue_created.first->create(arguments); //add default binding: - broker.getExchanges().getDefault()->bind(queue, name, 0); - queue->bound(broker.getExchanges().getDefault()->getName(), name, arguments); + getBroker().getExchanges().getDefault()->bind(queue, name, 0); + queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); //handle automatic cleanup: if (exclusive) { - connection.exclusiveQueues.push_back(queue); + getConnection().exclusiveQueues.push_back(queue); } } } - if (exclusive && !queue->isExclusiveOwner(&connection)) + if (exclusive && !queue->isExclusiveOwner(&getConnection())) throw ResourceLockedException( QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); @@ -223,14 +216,14 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = session.getQueue(queueName); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); + Queue::shared_ptr queue = getSession().getQueue(queueName); + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { queue->bound(exchangeName, routingKey, arguments); if (exchange->isDurable() && queue->isDurable()) { - broker.getStore().bind(*exchange, *queue, routingKey, arguments); + getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } } }else{ @@ -246,38 +239,38 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { - broker.getStore().unbind(*exchange, *queue, routingKey, arguments); + getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments); } } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - session.getQueue(queue)->purge(); + getSession().getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = session.getQueue(queue); + Queue::shared_ptr q = getSession().getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ throw PreconditionFailedException("Queue in use."); }else{ //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&connection)){ - QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); - if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); + if(q->isExclusiveOwner(&getConnection())){ + QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); + if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); } q->destroy(); - broker.getQueues().destroy(queue); - q->unbind(broker.getExchanges(), q); + getBroker().getQueues().destroy(queue); + q->unbind(getBroker().getExchanges(), q); } } @@ -286,8 +279,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - session.setPrefetchSize(prefetchSize); - session.setPrefetchCount(prefetchCount); + getSession().setPrefetchSize(prefetchSize); + getSession().setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -296,8 +289,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = session.getQueue(queueName); - if(!consumerTag.empty() && session.exists(consumerTag)){ + Queue::shared_ptr queue = getSession().getQueue(queueName); + if(!consumerTag.empty() && getSession().exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -305,33 +298,34 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); + getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); - if(!nowait) client.consumeOk(newTag); + if(!nowait) + getProxy().getBasic().consumeOk(newTag); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - session.cancel(consumerTag); + getSession().cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = session.getQueue(queueName); + Queue::shared_ptr queue = getSession().getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!session.get(token, queue, !noAck)){ + if(!getSession().get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId); + getProxy().getBasic().getEmpty(clusterId); } } void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - session.ackCumulative(deliveryTag); + getSession().ackCumulative(deliveryTag); } else { - session.ackRange(deliveryTag, deliveryTag); + getSession().ackRange(deliveryTag, deliveryTag); } } @@ -339,23 +333,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - session.recover(requeue); + getSession().recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - session.startTx(); + getSession().startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - session.commit(&broker.getStore()); + getSession().commit(&getBroker().getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - session.rollback(); - session.recover(false); + getSession().rollback(); + getSession().recover(false); } }} // namespace qpid::broker |
