diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 144 |
1 files changed, 72 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 2a0aa9ffee..be43dacb27 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -53,36 +53,33 @@ ProtocolVersion BrokerAdapter::getVersion() const { return connection.getVersion(); } -void BrokerAdapter::ChannelHandlerImpl::open( - const MethodContext& context, const string& /*outOfBand*/){ +void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk( - std::string()/* ID */, context.getRequestId()); + client.openOk(std::string()/* ID */);//GRS, context.getRequestId()); } -void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){ +void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ channel.flow(active); - client.flowOk(active, context.getRequestId()); + client.flowOk(active);//GRS, context.getRequestId()); } -void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, +void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk(context.getRequestId()); + client.closeOk();//GRS context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(){} -void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, +void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, const FieldTable& args){ @@ -107,31 +104,30 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u } } if(!nowait){ - client.declareOk(context.getRequestId()); + client.declareOk();//GRS context.getRequestId()); } } -void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/, bool nowait){ //TODO: implement unused Exchange::shared_ptr exchange(broker.getExchanges().get(name)); if (exchange->isDurable()) broker.getStore().destroy(*exchange); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk(context.getRequestId()); + if(!nowait) client.deleteOk();//GRS context.getRequestId()); } -void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name) +void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId()); + client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable(), context.getRequestId()); + client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId()); } } -void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context, - u_int16_t /*ticket*/, +void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -148,22 +144,22 @@ void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& cont } if (!exchange) { - client.queryOk(true, false, false, false, false, context.getRequestId()); + client.queryOk(true, false, false, false, false);//GRS, context.getRequestId()); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false, context.getRequestId()); + client.queryOk(false, true, false, false, false);//GRS, context.getRequestId()); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false, context.getRequestId()); + client.queryOk(false, false, false, false, false);//GRS, context.getRequestId()); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId()); + client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId()); } } -void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, +void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; @@ -200,12 +196,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint if (!nowait) { string queueName = queue->getName(); client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount(), - context.getRequestId()); + queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId()); } } -void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, +void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, bool nowait, const FieldTable& arguments){ @@ -219,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk(context.getRequestId()); + if(!nowait) client.bindOk();//GRS context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -227,9 +222,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_ } void -BrokerAdapter::QueueHandlerImpl::unbind( - const MethodContext& context, - uint16_t /*ticket*/, +BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& queueName, const string& exchangeName, const string& routingKey, @@ -245,17 +238,17 @@ BrokerAdapter::QueueHandlerImpl::unbind( broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk(context.getRequestId()); + client.unbindOk();//GRS context.getRequestId()); } -void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); - if(!nowait) client.purgeOk( count, context.getRequestId()); + if(!nowait) client.purgeOk( count);//GRS, context.getRequestId()); } -void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue, +void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); @@ -277,21 +270,20 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint } if(!nowait) - client.deleteOk(count, context.getRequestId()); + client.deleteOk(count);//GRS, context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk(context.getRequestId()); + client.qosOk();//GRS context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::consume( - const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, bool nowait, const FieldTable& fields) @@ -308,29 +300,26 @@ void BrokerAdapter::BasicHandlerImpl::consume( channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) client.consumeOk(newTag, context.getRequestId()); + if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); } -void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); + if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId()); } -void BrokerAdapter::BasicHandlerImpl::publish( - const MethodContext& context, uint16_t /*ticket*/, +void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) { Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage( - &connection, exchangeName, routingKey, mandatory, immediate, - context.methodBody); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); channel.handlePublish(msg); }else{ throw ChannelException( @@ -338,45 +327,47 @@ void BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); GetAdapter out(adapter, queue, "", connection.getFrameMax()); if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId, context.getRequestId()); + client.getEmpty(clusterId);//GRS, context.getRequestId()); } } -void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ channel.ack(deliveryTag, multiple); } -void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) +{ channel.recover(requeue); } -void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select() +{ channel.startTx(); - client.selectOk(context.getRequestId()); + client.selectOk();//GRS context.getRequestId()); } -void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit() +{ channel.commit(); - client.commitOk(context.getRequestId()); + client.commitOk();//GRS context.getRequestId()); } -void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ - +void BrokerAdapter::TxHandlerImpl::rollback() +{ channel.rollback(); - client.rollbackOk(context.getRequestId()); + client.rollbackOk();//GRS context.getRequestId()); channel.recover(false); } -void -BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +void BrokerAdapter::ChannelHandlerImpl::ok() { //no specific action required, generic response handling should be sufficient } @@ -385,27 +376,36 @@ BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // // Message class method handlers // -void -BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +void BrokerAdapter::ChannelHandlerImpl::ping() { - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); client.pong(); } void -BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong() { - client.ok(context.getRequestId()); + client.ok();//GRS context.getRequestId()); } -void -BrokerAdapter::ChannelHandlerImpl::resume( - const MethodContext&, - const string& /*channel*/ ) +void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } +void BrokerAdapter::setResponseTo(RequestId r) +{ + basicHandler.client.setResponseTo(r); + channelHandler.client.setResponseTo(r); + exchangeHandler.client.setResponseTo(r); + bindingHandler.client.setResponseTo(r); + messageHandler.client.setResponseTo(r); + queueHandler.client.setResponseTo(r); + txHandler.client.setResponseTo(r); + dtxHandler.setResponseTo(r); +} + + }} // namespace qpid::broker |