summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp26
1 files changed, 18 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 36232339e5..3c742b8d2d 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -118,8 +118,8 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+ bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& args){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -127,8 +127,10 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
}else{
try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
- if(!response.second && response.first->getType() != type){
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (durable) broker.getStore().create(*response.first);
+ } else if (response.first->getType() != type) {
throw ConnectionException(
530,
"Exchange already declared to be of type "
@@ -145,10 +147,12 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+ const string& name, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
- broker.getExchanges().destroy(exchange);
+ Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+ if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+ broker.getExchanges().destroy(name);
if(!nowait) client.deleteOk(context.getRequestId());
}
@@ -174,6 +178,8 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
//add default binding:
broker.getExchanges().getDefault()->bind(queue, name, 0);
+
+ //handle automatic cleanup:
if (exclusive) {
connection.exclusiveQueues.push_back(queue);
} else if(autoDelete){
@@ -202,7 +208,9 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
if(!nowait) client.bindOk(context.getRequestId());
}else{
throw ChannelException(
@@ -225,7 +233,9 @@ BrokerAdapter::QueueHandlerImpl::unbind(
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
- exchange->unbind(queue, routingKey, &arguments);
+ if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
+ broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+ }
client.unbindOk(context.getRequestId());
}