summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp144
1 files changed, 72 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 2a0aa9ffee..be43dacb27 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -53,36 +53,33 @@ ProtocolVersion BrokerAdapter::getVersion() const {
return connection.getVersion();
}
-void BrokerAdapter::ChannelHandlerImpl::open(
- const MethodContext& context, const string& /*outOfBand*/){
+void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(
- std::string()/* ID */, context.getRequestId());
+ client.openOk(std::string()/* ID */);//GRS, context.getRequestId());
}
-void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){
+void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
channel.flow(active);
- client.flowOk(active, context.getRequestId());
+ client.flowOk(active);//GRS, context.getRequestId());
}
-void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/,
+void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk(context.getRequestId());
+ client.closeOk();//GRS context.getRequestId());
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::ChannelHandlerImpl::closeOk(){}
-void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
+void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait,
const FieldTable& args){
@@ -107,31 +104,30 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u
}
}
if(!nowait){
- client.declareOk(context.getRequestId());
+ client.declareOk();//GRS context.getRequestId());
}
}
-void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
const string& name, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk(context.getRequestId());
+ if(!nowait) client.deleteOk();//GRS context.getRequestId());
}
-void BrokerAdapter::ExchangeHandlerImpl::query(const MethodContext& context, u_int16_t /*ticket*/, const string& name)
+void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs(), context.getRequestId());
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable(), context.getRequestId());
+ client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId());
}
}
-void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& context,
- u_int16_t /*ticket*/,
+void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -148,22 +144,22 @@ void BrokerAdapter::BindingHandlerImpl::query(const framing::MethodContext& cont
}
if (!exchange) {
- client.queryOk(true, false, false, false, false, context.getRequestId());
+ client.queryOk(true, false, false, false, false);//GRS, context.getRequestId());
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false, context.getRequestId());
+ client.queryOk(false, true, false, false, false);//GRS, context.getRequestId());
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false, context.getRequestId());
+ client.queryOk(false, false, false, false, false);//GRS, context.getRequestId());
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched, context.getRequestId());
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId());
}
}
-void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
+void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name,
bool passive, bool durable, bool exclusive,
bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
@@ -200,12 +196,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint
if (!nowait) {
string queueName = queue->getName();
client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount(),
- context.getRequestId());
+ queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId());
}
}
-void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
+void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
const string& exchangeName, const string& routingKey, bool nowait,
const FieldTable& arguments){
@@ -219,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk(context.getRequestId());
+ if(!nowait) client.bindOk();//GRS context.getRequestId());
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -227,9 +222,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_
}
void
-BrokerAdapter::QueueHandlerImpl::unbind(
- const MethodContext& context,
- uint16_t /*ticket*/,
+BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
const string& queueName,
const string& exchangeName,
const string& routingKey,
@@ -245,17 +238,17 @@ BrokerAdapter::QueueHandlerImpl::unbind(
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk(context.getRequestId());
+ client.unbindOk();//GRS context.getRequestId());
}
-void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
- if(!nowait) client.purgeOk( count, context.getRequestId());
+ if(!nowait) client.purgeOk( count);//GRS, context.getRequestId());
}
-void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
+void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
@@ -277,21 +270,20 @@ void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint
}
if(!nowait)
- client.deleteOk(count, context.getRequestId());
+ client.deleteOk(count);//GRS, context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk(context.getRequestId());
+ client.qosOk();//GRS context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::consume(
- const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
bool nowait, const FieldTable& fields)
@@ -308,29 +300,26 @@ void BrokerAdapter::BasicHandlerImpl::consume(
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) client.consumeOk(newTag, context.getRequestId());
+ if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId());
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
}
-void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
+ if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId());
}
-void BrokerAdapter::BasicHandlerImpl::publish(
- const MethodContext& context, uint16_t /*ticket*/,
+void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
const string& exchangeName, const string& routingKey,
bool mandatory, bool immediate)
{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(
- &connection, exchangeName, routingKey, mandatory, immediate,
- context.methodBody);
+ BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
channel.handlePublish(msg);
}else{
throw ChannelException(
@@ -338,45 +327,47 @@ void BrokerAdapter::BasicHandlerImpl::publish(
}
}
-void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = getQueue(queueName);
GetAdapter out(adapter, queue, "", connection.getFrameMax());
if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId, context.getRequestId());
+ client.getEmpty(clusterId);//GRS, context.getRequestId());
}
}
-void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
+void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
channel.ack(deliveryTag, multiple);
}
-void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
+{
channel.recover(requeue);
}
-void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::select()
+{
channel.startTx();
- client.selectOk(context.getRequestId());
+ client.selectOk();//GRS context.getRequestId());
}
-void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::TxHandlerImpl::commit()
+{
channel.commit();
- client.commitOk(context.getRequestId());
+ client.commitOk();//GRS context.getRequestId());
}
-void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
-
+void BrokerAdapter::TxHandlerImpl::rollback()
+{
channel.rollback();
- client.rollbackOk(context.getRequestId());
+ client.rollbackOk();//GRS context.getRequestId());
channel.recover(false);
}
-void
-BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+void BrokerAdapter::ChannelHandlerImpl::ok()
{
//no specific action required, generic response handling should be sufficient
}
@@ -385,27 +376,36 @@ BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
//
// Message class method handlers
//
-void
-BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+void BrokerAdapter::ChannelHandlerImpl::ping()
{
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
client.pong();
}
void
-BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::ChannelHandlerImpl::pong()
{
- client.ok(context.getRequestId());
+ client.ok();//GRS context.getRequestId());
}
-void
-BrokerAdapter::ChannelHandlerImpl::resume(
- const MethodContext&,
- const string& /*channel*/ )
+void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
+void BrokerAdapter::setResponseTo(RequestId r)
+{
+ basicHandler.client.setResponseTo(r);
+ channelHandler.client.setResponseTo(r);
+ exchangeHandler.client.setResponseTo(r);
+ bindingHandler.client.setResponseTo(r);
+ messageHandler.client.setResponseTo(r);
+ queueHandler.client.setResponseTo(r);
+ txHandler.client.setResponseTo(r);
+ dtxHandler.setResponseTo(r);
+}
+
+
}} // namespace qpid::broker