summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp58
1 files changed, 8 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index dc8cd6cce1..bbf6686a6c 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -38,7 +38,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
connection(c),
basicHandler(*this),
channelHandler(*this),
- connectionHandler(*this),
exchangeHandler(*this),
bindingHandler(*this),
messageHandler(*this),
@@ -51,47 +50,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext&, const FieldTable& /*clientProperties*/,
- const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/)
-{
- client.tune(
- CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(
- const MethodContext&, const string& /*response*/){}
-
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
- const MethodContext&, uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::open(
- const MethodContext& context, const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(
- knownhosts, context.getRequestId());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.getOutput().close();
-}
void BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
@@ -208,7 +166,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel.getId());
+ queue = getQueue(name);
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
broker.getQueues().declare(
@@ -249,7 +207,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -275,7 +233,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -290,7 +248,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(
void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
if(!nowait) client.purgeOk( count, context.getRequestId());
}
@@ -299,7 +257,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
+ Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw ChannelException(406, "Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -337,7 +295,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Queue::shared_ptr queue = getQueue(queueName);
if(!consumerTag.empty() && channel.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
@@ -377,8 +335,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(
}
void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
+ Queue::shared_ptr queue = getQueue(queueName);
+ if(!channel.get(queue, "", !noAck)){
string clusterId;//not used, part of an imatix hack
client.getEmpty(clusterId, context.getRequestId());