diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 292 |
1 files changed, 111 insertions, 181 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 3d62e73185..63c4b660b2 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/SequenceSet.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/broker/SessionState.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -64,53 +65,56 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_TYPE, type)); - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange declare request from " << getConnection().getUserId())); - } - //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = getBroker().getExchanges().get(alternateExchange); } if(passive){ + AclModule* acl = getBroker().getAcl(); + if (acl) { + //TODO: why does a passive declare require create + //permission? The purpose of the passive flag is to state + //that the exchange should *not* created. For + //authorisation a passive declare is similar to + //exchange-query. + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_EXCHANGE,exchange,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << getConnection().getUserId())); + } Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); checkType(actual, type); checkAlternate(actual, alternate); - }else{ + }else{ if(exchange.find("amq.") == 0 || exchange.find("qpid.") == 0) { throw framing::NotAllowedException(QPID_MSG("Exchange names beginning with \"amq.\" or \"qpid.\" are reserved. (exchange=\"" << exchange << "\")")); } try{ - std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); - if (response.second) { - if (alternate) { - response.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } - if (durable) { - getBroker().getStore().create(*response.first, args); - } - } else { + std::pair<Exchange::shared_ptr, bool> response = + getBroker().createExchange(exchange, type, durable, alternateExchange, args, + getConnection().getUserId(), getConnection().getUrl()); + if (!response.second) { + //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), + getConnection().getUserId(), + exchange, + type, + alternateExchange, + durable, + false, + ManagementAgent::toMap(args), + "existing")); } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, - alternateExchange, durable, false, ManagementAgent::toMap(args), - response.second ? "created" : "existing")); - }catch(UnknownExchangeTypeException& /*e*/){ - throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); + throw NotFoundException(QPID_MSG("Exchange type not implemented: " << type)); } } } @@ -134,22 +138,8 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << getConnection().getUserId())); - } - - //TODO: implement unused - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); - if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - getBroker().getExchanges().destroy(name); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); + //TODO: implement if-unused + getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl()); } ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) @@ -169,67 +159,19 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam } void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, - const string& exchangeName, const string& routingKey, - const FieldTable& arguments) + const string& exchangeName, const string& routingKey, + const FieldTable& arguments) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); - - if (!acl->authorise(getConnection().getUserId(),acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = getQueue(queueName); - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if(exchange){ - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { - queue->bound(exchangeName, routingKey, arguments); - if (exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); - } - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, - queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments))); - } - }else{ - throw NotFoundException("Bind failed. No such exchange: " + exchangeName); - } + getBroker().bind(queueName, exchangeName, routingKey, arguments, + getConnection().getUserId(), getConnection().getUrl()); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, routingKey)); - if (!acl->authorise(getConnection().getUserId(),acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << getConnection().getUserId())); - } - - Queue::shared_ptr queue = getQueue(queueName); - if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - //TODO: revise unbind to rely solely on binding key (not args) - if (exchange->unbind(queue, routingKey, 0)) { - if (exchange->isDurable() && queue->isDurable()) - getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); - } + getBroker().unbind(queueName, exchangeName, routingKey, + getConnection().getUserId(), getConnection().getUrl()); } ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, @@ -332,52 +274,42 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, const qpid::framing::FieldTable& arguments) -{ - AclModule* acl = getBroker().getAcl(); - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, std::string(passive ? _TRUE : _FALSE) )); - params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); - params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); - params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); - params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); - - if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); - } - - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = getBroker().getExchanges().get(alternateExchange); - } +{ Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getQueue(name); + AclModule* acl = getBroker().getAcl(); + if (acl) { + //TODO: why does a passive declare require create + //permission? The purpose of the passive flag is to state + //that the queue should *not* created. For + //authorisation a passive declare is similar to + //queue-query (or indeed a qmf query). + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _TRUE)); + params.insert(make_pair(acl::PROP_DURABLE, std::string(durable ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_EXCLUSIVE, std::string(exclusive ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_AUTODELETE, std::string(autoDelete ? _TRUE : _FALSE))); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + if (!acl->authorise(getConnection().getUserId(),acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << getConnection().getUserId())); + } + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().getQueues().declare(name, durable, - autoDelete, - exclusive ? &session : 0); + std::pair<Queue::shared_ptr, bool> queue_created = + getBroker().createQueue(name, durable, + autoDelete, + exclusive ? &session : 0, + alternateExchange, + arguments, + getConnection().getUserId(), + getConnection().getUrl()); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - if (alternate) { - queue->setAlternateExchange(alternate); - alternate->incAlternateUsers(); - } - - //apply settings & create persistent record if required - try { queue_created.first->create(arguments); } - catch (...) { getBroker().getQueues().destroy(name); throw; } - - //add default binding: - getBroker().getExchanges().getDefault()->bind(queue, name, 0); - queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); - //handle automatic cleanup: if (exclusive) { exclusiveQueues.push_back(queue); @@ -386,21 +318,20 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (exclusive && queue->setExclusiveOwner(&session)) { exclusiveQueues.push_back(queue); } + ManagementAgent* agent = getBroker().getManagementAgent(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), + name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), + "existing")); } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), - queue_created.second ? "created" : "existing")); } - if (exclusive && !queue->isExclusiveOwner(&session)) + if (exclusive && !queue->isExclusiveOwner(&session)) throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " << queue->getName())); -} - - +} + void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ AclModule* acl = getBroker().getAcl(); if (acl) @@ -409,40 +340,32 @@ void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ throw UnauthorizedAccessException(QPID_MSG("ACL denied queue purge request from " << getConnection().getUserId())); } getQueue(queue)->purge(); -} - -void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACT_DELETE,acl::OBJ_QUEUE,queue,NULL) ) - throw UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << getConnection().getUserId())); - } +} - Queue::shared_ptr q = getQueue(queue); - if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) +void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty) +{ + if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) { throw ResourceLockedException(QPID_MSG("Cannot delete queue " - << queue << "; it is exclusive to another session")); - if(ifEmpty && q->getMessageCount() > 0){ - throw PreconditionFailedException("Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw PreconditionFailedException("Queue in use."); - }else{ + << queue->getName() << "; it is exclusive to another session")); + } else if(ifEmpty && queue->getMessageCount() > 0) { + throw PreconditionFailedException(QPID_MSG("Cannot delete queue " + << queue->getName() << "; queue not empty")); + } else if(ifUnused && queue->getConsumerCount() > 0) { + throw PreconditionFailedException(QPID_MSG("Cannot delete queue " + << queue->getName() << "; queue in use")); + } else if (queue->isExclusiveOwner(&session)) { //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&getConnection())){ - QueueVector::iterator i = std::find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); - if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); - } - q->destroy(); - getBroker().getQueues().destroy(queue); - q->unbind(getBroker().getExchanges(), q); - - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); - q->notifyDeleted(); - } + QueueVector::iterator i = std::find(exclusiveQueues.begin(), + exclusiveQueues.end(), + queue); + if (i < exclusiveQueues.end()) exclusiveQueues.erase(i); + } +} + +void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty) +{ + getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(), + boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty)); } SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : @@ -508,7 +431,9 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, void SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { - state.cancel(destination); + if (!state.cancel(destination)) { + throw NotFoundException(QPID_MSG("No such subscription: " << destination)); + } ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) @@ -587,7 +512,12 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st -void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op +void SessionAdapter::ExecutionHandlerImpl::sync() +{ + session.addPendingExecutionSync(); + /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */ + +} void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/) { |