summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 86c5024deb..fa25221bbd 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -479,6 +479,10 @@ BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
else throw ChannelException(403, "Access would violate previously granted exclusivity");
}
+
+ connection.getChannel(channel).cancel(destination);
+
+ connection.client->getMessageHandler()->ok(channel);
}
void
@@ -507,6 +511,25 @@ BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel,
connection.getChannel(channel).setPrefetchCount(prefetchCount);
connection.client->getMessageHandler()->ok(channel);
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ Channel& channel = connection.getChannel(channelId);
+ if(!destination.empty() && channel.exists(destination)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = destination;
+ channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+
+ connection.client->getMessageHandler()->ok(channelId);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
}
void