diff options
Diffstat (limited to 'cpp/lib/broker/SessionHandlerImpl.cpp')
| -rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index b23432e29d..d1b1d996a4 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -223,7 +223,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin if (parent->channels[channel] == 0) { parent->channels[channel] = new Channel(parent->client->getProtocolVersion() , parent->context, channel, parent->framemax, parent->queues->getStore(), parent->settings.stagingThreshold); - parent->client->getChannel().openOk(channel); + parent->client->getChannel().openOk(channel, ""); } else { std::stringstream out; out << "Channel already open: " << channel; @@ -337,6 +337,25 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t } } + +void SessionHandlerImpl::QueueHandlerImpl::unbind(u_int16_t channel, + u_int16_t /*ticket*/, + const string& queueName, + const string& exchangeName, + const string& routingKey, + const FieldTable& arguments) +{ + Queue::shared_ptr queue = parent->getQueue(queueName, channel); + Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + if(exchange){ + exchange->unbind(queue, routingKey, &arguments); + }else{ + throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); + } + + parent->client->getQueue().unbindOk(channel); +} + void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); @@ -446,7 +465,11 @@ void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ parent->getChannel(channel)->recover(requeue); - parent->client->getBasic().recoverOk(channel); +} + +void SessionHandlerImpl::BasicHandlerImpl::recoverSync(u_int16_t channel, bool requeue){ + parent->getChannel(channel)->recover(requeue); + parent->client->getBasic().recoverSyncOk(channel); } void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){ |
