diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 663565c26c..990727dda5 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = state.getQueue(name); + Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return Queue010QueryResult(queue->getName(), @@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = state.getQueue(name); + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - state.getQueue(queue)->purge(); + getQueue(queue)->purge(); } void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = state.getQueue(queue); + Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : - HandlerImpl(s), + HandlerHelper(s), releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) @@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, uint64_t /*resumeTtl*/, const FieldTable& arguments) { - Queue::shared_ptr queue = state.getQueue(queueName); + Queue::shared_ptr queue = getQueue(queueName); if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); @@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, //TODO } + +Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { + Queue::shared_ptr queue; + if (name.empty()) { + throw SessionException(531, QPID_MSG("No queue name specified.")); + } else { + queue = session.getBroker().getQueues().find(name); + if (!queue) + throw NotFoundException(QPID_MSG("Queue not found: "<<name)); + } + return queue; +} + }} // namespace qpid::broker |
