diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 58 |
1 files changed, 8 insertions, 50 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index dc8cd6cce1..bbf6686a6c 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -38,7 +38,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : connection(c), basicHandler(*this), channelHandler(*this), - connectionHandler(*this), exchangeHandler(*this), bindingHandler(*this), messageHandler(*this), @@ -51,47 +50,6 @@ BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : ProtocolVersion BrokerAdapter::getVersion() const { return connection.getVersion(); } - -void BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext&, const FieldTable& /*clientProperties*/, - const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/) -{ - client.tune( - CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); -} - -void BrokerAdapter::ConnectionHandlerImpl::secureOk( - const MethodContext&, const string& /*response*/){} - -void BrokerAdapter::ConnectionHandlerImpl::tuneOk( - const MethodContext&, uint16_t /*channelmax*/, - uint32_t framemax, uint16_t heartbeat) -{ - connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); -} - -void BrokerAdapter::ConnectionHandlerImpl::open( - const MethodContext& context, const string& /*virtualHost*/, - const string& /*capabilities*/, bool /*insist*/) -{ - string knownhosts; - client.openOk( - knownhosts, context.getRequestId()); -} - -void BrokerAdapter::ConnectionHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - connection.getOutput().close(); -} - -void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.getOutput().close(); -} void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ @@ -208,7 +166,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = connection.getQueue(name, channel.getId()); + queue = getQueue(name); } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( @@ -249,7 +207,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -275,7 +233,7 @@ BrokerAdapter::QueueHandlerImpl::unbind( const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); @@ -290,7 +248,7 @@ BrokerAdapter::QueueHandlerImpl::unbind( void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); if(!nowait) client.purgeOk( count, context.getRequestId()); } @@ -299,7 +257,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); - Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); + Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw ChannelException(406, "Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -337,7 +295,7 @@ void BrokerAdapter::BasicHandlerImpl::consume( bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); + Queue::shared_ptr queue = getQueue(queueName); if(!consumerTag.empty() && channel.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } @@ -377,8 +335,8 @@ void BrokerAdapter::BasicHandlerImpl::publish( } void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ + Queue::shared_ptr queue = getQueue(queueName); + if(!channel.get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId, context.getRequestId()); |
