summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-06 14:21:02 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-06 14:21:02 +0000
commit80b1b0b5f443bfb3c9d62a80e1419c224d0229d8 (patch)
treee8c5dae534d780fb8be9037fec7a46e016bf6154 /cpp
parentdd05cfcd8322cf786e78ed8dbdf4e723f25e9751 (diff)
downloadqpid-python-80b1b0b5f443bfb3c9d62a80e1419c224d0229d8.tar.gz
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504152 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp23
1 files changed, 23 insertions, 0 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 86c5024deb..fa25221bbd 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -479,6 +479,10 @@ BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/,
if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
else throw ChannelException(403, "Access would violate previously granted exclusivity");
}
+
+ connection.getChannel(channel).cancel(destination);
+
+ connection.client->getMessageHandler()->ok(channel);
}
void
@@ -507,6 +511,25 @@ BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel,
connection.getChannel(channel).setPrefetchCount(prefetchCount);
connection.client->getMessageHandler()->ok(channel);
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channelId);
+ Channel& channel = connection.getChannel(channelId);
+ if(!destination.empty() && channel.exists(destination)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ string newTag = destination;
+ channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
+
+ connection.client->getMessageHandler()->ok(channelId);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
}
void