summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 577c053413..86c5024deb 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -424,6 +424,10 @@ BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId,
if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
else throw ChannelException(403, "Access would violate previously granted exclusivity");
}
+
+ connection.getChannel(channel).cancel(destination);
+
+ connection.client->getMessageHandler()->ok(channel);
}
void
@@ -456,6 +460,25 @@ BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
u_int64_t /*value*/ )
{
assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ Channel& channel = connection.getChannel(channelId);
+ if(!destination.empty() && channel.exists(destination)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = destination;
+ channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+
+ connection.client->getMessageHandler()->ok(channelId);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
}
void