summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-18 19:43:29 +0000
committerAlan Conway <aconway@apache.org>2007-09-18 19:43:29 +0000
commit6aeb03f0f5ac7ede957995fc784367a30920c683 (patch)
tree7fe35f0ce9fe6bf17dbd6416deb6069ef9c7b07c /cpp/src/qpid/broker/BrokerAdapter.cpp
parent8b039e1ed4e4340917d7fd3d8202049e691ca6ec (diff)
downloadqpid-python-6aeb03f0f5ac7ede957995fc784367a30920c683.tar.gz
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@577027 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp126
1 files changed, 60 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 35a87784d2..c266b36dfb 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -38,42 +38,35 @@ typedef std::vector<Queue::shared_ptr> QueueVector;
// by the handlers responsible for those classes.
//
-BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
- CoreRefs(s,
- s.getAdapter()->getConnection(),
- s.getAdapter()->getConnection().broker,
- a),
- basicHandler(*this),
- exchangeHandler(*this),
- bindingHandler(*this),
- messageHandler(*this),
- queueHandler(*this),
- txHandler(*this),
- dtxHandler(*this)
+BrokerAdapter::BrokerAdapter(Session& s) :
+ HandlerImpl(s),
+ basicHandler(s),
+ exchangeHandler(s),
+ bindingHandler(s),
+ messageHandler(s),
+ queueHandler(s),
+ txHandler(s),
+ dtxHandler(s)
{}
-ProtocolVersion BrokerAdapter::getVersion() const {
- return connection.getVersion();
-}
-
void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
- alternate = broker.getExchanges().get(alternateExchange);
+ alternate = getBroker().getExchanges().get(alternateExchange);
}
if(passive){
- Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+ Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
checkType(actual, type);
checkAlternate(actual, alternate);
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
if (response.second) {
if (durable) {
- broker.getStore().create(*response.first);
+ getBroker().getStore().create(*response.first);
}
if (alternate) {
response.first->setAlternate(alternate);
@@ -109,17 +102,17 @@ void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exc
void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
- Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
- if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- broker.getExchanges().destroy(name);
+ getBroker().getExchanges().destroy(name);
}
ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
- Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
return ExchangeQueryResult("", false, true, FieldTable());
@@ -134,12 +127,12 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
{
Exchange::shared_ptr exchange;
try {
- exchange = broker.getExchanges().get(exchangeName);
+ exchange = getBroker().getExchanges().get(exchangeName);
} catch (const ChannelException&) {}
Queue::shared_ptr queue;
if (!queueName.empty()) {
- queue = broker.getQueues().find(queueName);
+ queue = getBroker().getQueues().find(queueName);
}
if (!exchange) {
@@ -160,7 +153,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = session.getQueue(name);
+ Queue::shared_ptr queue = getSession().getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -179,22 +172,22 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
- alternate = broker.getExchanges().get(alternateExchange);
+ alternate = getBroker().getExchanges().get(alternateExchange);
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = session.getQueue(name);
+ queue = getSession().getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(
+ getBroker().getQueues().declare(
name, durable,
autoDelete && !exclusive,
- exclusive ? &connection : 0);
+ exclusive ? &getConnection() : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- session.setDefaultQueue(queue);
+ getSession().setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -204,16 +197,16 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
queue_created.first->create(arguments);
//add default binding:
- broker.getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(broker.getExchanges().getDefault()->getName(), name, arguments);
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
//handle automatic cleanup:
if (exclusive) {
- connection.exclusiveQueues.push_back(queue);
+ getConnection().exclusiveQueues.push_back(queue);
}
}
}
- if (exclusive && !queue->isExclusiveOwner(&connection))
+ if (exclusive && !queue->isExclusiveOwner(&getConnection()))
throw ResourceLockedException(
QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
@@ -223,14 +216,14 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = session.getQueue(queueName);
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
queue->bound(exchangeName, routingKey, arguments);
if (exchange->isDurable() && queue->isDurable()) {
- broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+ getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
}else{
@@ -246,38 +239,38 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
- broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments);
}
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- session.getQueue(queue)->purge();
+ getSession().getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = session.getQueue(queue);
+ Queue::shared_ptr q = getSession().getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
throw PreconditionFailedException("Queue in use.");
}else{
//remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&connection)){
- QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
- if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+ if(q->isExclusiveOwner(&getConnection())){
+ QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+ if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
}
q->destroy();
- broker.getQueues().destroy(queue);
- q->unbind(broker.getExchanges(), q);
+ getBroker().getQueues().destroy(queue);
+ q->unbind(getBroker().getExchanges(), q);
}
}
@@ -286,8 +279,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- session.setPrefetchSize(prefetchSize);
- session.setPrefetchCount(prefetchCount);
+ getSession().setPrefetchSize(prefetchSize);
+ getSession().setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -296,8 +289,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = session.getQueue(queueName);
- if(!consumerTag.empty() && session.exists(consumerTag)){
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
+ if(!consumerTag.empty() && getSession().exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -305,33 +298,34 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+ getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
- if(!nowait) client.consumeOk(newTag);
+ if(!nowait)
+ getProxy().getBasic().consumeOk(newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- session.cancel(consumerTag);
+ getSession().cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = session.getQueue(queueName);
+ Queue::shared_ptr queue = getSession().getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!session.get(token, queue, !noAck)){
+ if(!getSession().get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId);
+ getProxy().getBasic().getEmpty(clusterId);
}
}
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- session.ackCumulative(deliveryTag);
+ getSession().ackCumulative(deliveryTag);
} else {
- session.ackRange(deliveryTag, deliveryTag);
+ getSession().ackRange(deliveryTag, deliveryTag);
}
}
@@ -339,23 +333,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- session.recover(requeue);
+ getSession().recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- session.startTx();
+ getSession().startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- session.commit(&broker.getStore());
+ getSession().commit(&getBroker().getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- session.rollback();
- session.recover(false);
+ getSession().rollback();
+ getSession().recover(false);
}
}} // namespace qpid::broker