summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp29
1 files changed, 21 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 663565c26c..990727dda5 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -114,7 +114,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -135,7 +135,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -181,7 +181,7 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = state.getQueue(name);
+ Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return Queue010QueryResult(queue->getName(),
@@ -204,7 +204,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = state.getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -245,12 +245,12 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
- state.getQueue(queue)->purge();
+ getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = state.getQueue(queue);
+ Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -269,7 +269,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
+ HandlerHelper(s),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
@@ -301,7 +301,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
uint64_t /*resumeTtl*/,
const FieldTable& arguments)
{
- Queue::shared_ptr queue = state.getQueue(queueName);
+ Queue::shared_ptr queue = getQueue(queueName);
if(!destination.empty() && state.exists(destination))
throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
@@ -404,6 +404,19 @@ void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
//TODO
}
+
+Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ throw SessionException(531, QPID_MSG("No queue name specified."));
+ } else {
+ queue = session.getBroker().getQueues().find(name);
+ if (!queue)
+ throw NotFoundException(QPID_MSG("Queue not found: "<<name));
+ }
+ return queue;
+}
+
}} // namespace qpid::broker