summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerAdapter.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-02 22:03:10 +0000
commitb5c270f10496f522ef6a03a8fa60f85d55c9187d (patch)
tree714e7abf7ba591d00232d821440e51461175cb9e /cpp/lib/broker/BrokerAdapter.cpp
parent750f272ac99e8c830807affb3ae68ab0beeca63f (diff)
downloadqpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body, ChannelAdapter is used to send frames, not OutputHandler. * cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member. Context is per-method not per-channel. * cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId with MethodContext (for responses) or ChannelAdapter (for requests.) Use context request-ID to construct responses, send all bodies via ChannelAdapter. * cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter. * cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters. Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion& * Cosmetic changes, many files: - fixed indentation, broke long lines. - removed unnecessary qpid:: prefixes. * broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into broker::channel. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp300
1 files changed, 53 insertions, 247 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 73ece8b264..10e386ff41 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -28,168 +28,20 @@ namespace broker {
using namespace qpid;
using namespace qpid::framing;
-typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+typedef std::vector<Queue::shared_ptr> QueueVector;
-class BrokerAdapter::ServerOps : public AMQP_ServerOperations
-{
- public:
- ServerOps(Channel& ch, Connection& c, Broker& b) :
- basicHandler(ch, c, b),
- channelHandler(ch, c, b),
- connectionHandler(ch, c, b),
- exchangeHandler(ch, c, b),
- messageHandler(ch, c, b),
- queueHandler(ch, c, b),
- txHandler(ch, c, b)
- {}
-
- ChannelHandler* getChannelHandler() { return &channelHandler; }
- ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- AccessHandler* getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw ConnectionException(540, "File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented"); }
-
- private:
- struct CoreRefs {
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b) {}
-
- Channel& channel;
- Connection& connection;
- Broker& broker;
- };
-
- class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler {
- public:
- ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
-
- void startOk(const MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const MethodContext& context, const std::string& response);
- void tuneOk(const MethodContext& context, u_int16_t channelMax,
- u_int32_t frameMax, u_int16_t heartbeat);
- void open(const MethodContext& context, const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const MethodContext& context, u_int16_t replyCode,
- const std::string& replyText,
- u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{
- public:
- ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void open(const MethodContext& context, const std::string& outOfBand);
- void flow(const MethodContext& context, bool active);
- void flowOk(const MethodContext& context, bool active);
- void ok( const MethodContext& context );
- void ping( const MethodContext& context );
- void pong( const MethodContext& context );
- void resume( const MethodContext& context, const std::string& channelId );
- void close(const MethodContext& context, u_int16_t replyCode, const
- std::string& replyText, u_int16_t classId, u_int16_t methodId);
- void closeOk(const MethodContext& context);
- };
-
- class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{
- public:
- ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void delete_(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
- };
-
- class QueueHandlerImpl : private CoreRefs, public QueueHandler{
- public:
- QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(const MethodContext& context,
- u_int16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool nowait);
- void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
- };
-
- class BasicHandlerImpl : private CoreRefs, public BasicHandler{
- public:
- BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void qos(const MethodContext& context, u_int32_t prefetchSize,
- u_int16_t prefetchCount, bool global);
- void consume(
- const MethodContext& context, u_int16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const MethodContext& context, const std::string& consumerTag,
- bool nowait);
- void publish(const MethodContext& context, u_int16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
- void get(const MethodContext& context, u_int16_t ticket, const std::string& queue,
- bool noAck);
- void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple);
- void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue);
- void recover(const MethodContext& context, bool requeue);
- };
-
- class TxHandlerImpl : private CoreRefs, public TxHandler{
- public:
- TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {}
- void select(const MethodContext& context);
- void commit(const MethodContext& context);
- void rollback(const MethodContext& context);
- };
-
- BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
- ExchangeHandlerImpl exchangeHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
-
-};
-
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk(
- const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/,
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext& context , const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/){
connection.client->getConnection().tune(
context, 100, connection.getFrameMax(), connection.getHeartbeat());
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){}
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk(
+ const MethodContext&, const string& /*response*/){}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk(
const MethodContext&, u_int16_t /*channelmax*/,
u_int32_t framemax, u_int16_t heartbeat)
{
@@ -197,12 +49,12 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk(
connection.setHeartbeat(heartbeat);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){
string knownhosts;
connection.client->getConnection().openOk(context, knownhosts);
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
{
@@ -210,21 +62,21 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close(
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){
+void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
connection.getOutput().close();
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::open(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open(
const MethodContext& context, const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
connection.client->getChannel().openOk(context, std::string()/* ID */);
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close(
const MethodContext& context, u_int16_t /*replyCode*/,
const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/)
@@ -234,13 +86,13 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::close(
connection.closeChannel(channel.getId());
}
-void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){}
+void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
if(passive){
if(!broker.getExchanges().get(exchange)) {
@@ -265,17 +117,17 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext&
}
}
-void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
+void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
//TODO: implement unused
broker.getExchanges().destroy(exchange);
if(!nowait) connection.client->getExchange().deleteOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
Queue::shared_ptr queue;
if (passive && !name.empty()) {
queue = connection.getQueue(name, channel.getId());
@@ -308,9 +160,9 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& co
}
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
@@ -325,7 +177,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte
}
void
-BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
+BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind(
const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -344,15 +196,15 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind(
connection.client->getQueue().unbindOk(context);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
int count = queue->purge();
if(!nowait) connection.client->getQueue().purgeOk(context, count);
}
-void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
+void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
ChannelException error(0, "");
int count(0);
Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
@@ -363,7 +215,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co
}else{
//remove the queue from the list of exclusive queues if necessary
if(q->isExclusiveOwner(&connection)){
- queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
}
count = q->getMessageCount();
@@ -377,14 +229,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co
-void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
connection.client->getBasic().qosOk(context);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
const MethodContext& context, u_int16_t /*ticket*/,
const string& queueName, const string& consumerTag,
bool noLocal, bool noAck, bool exclusive,
@@ -412,19 +264,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume(
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag);
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish(
+ const MethodContext& context, u_int16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate)
+{
Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
if(exchange){
- BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate);
+ BasicMessage* msg = new BasicMessage(
+ &connection, exchangeName, routingKey, mandatory, immediate,
+ context.methodBody);
channel.handlePublish(msg, exchange);
}else{
throw ChannelException(
@@ -432,7 +288,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!connection.getChannel(channel.getId()).get(queue, !noAck)){
string clusterId;//not used, part of an imatix hack
@@ -441,7 +297,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& contex
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
try{
channel.ack(deliveryTag, multiple);
}catch(InvalidAckException& e){
@@ -449,23 +305,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int
}
}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
channel.recover(requeue);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
channel.begin();
connection.client->getTx().selectOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
channel.commit();
connection.client->getTx().commitOk(context);
}
-void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){
+void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
channel.rollback();
connection.client->getTx().rollbackOk(context);
@@ -473,82 +329,32 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& cont
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& )
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
{
//no specific action required, generic response handling should be sufficient
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
connection.client->getChannel().ok(context);
connection.client->getChannel().pong(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context)
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
connection.client->getChannel().ok(context);
}
void
-BrokerAdapter::ServerOps::ChannelHandlerImpl::resume(
+BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume(
const MethodContext&,
const string& /*channel*/ )
{
assert(0); // FIXME aconway 2007-01-04: 0-9 feature
}
-BrokerAdapter::BrokerAdapter(
- std::auto_ptr<Channel> ch, Connection& c, Broker& b
-) :
- channel(ch),
- connection(c),
- broker(b),
- serverOps(new ServerOps(*channel,c,b))
-{
- init(channel->getId(), c.getOutput(), channel->getVersion());
-}
-
-void BrokerAdapter::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- method->invoke(*serverOps, context);
- }catch(ChannelException& e){
- connection.client->getChannel().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.client->getConnection().close(
- context, e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.client->getConnection().close(
- context, 541/*internal error*/, e.what(),
- method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) {
- channel->handleHeader(body);
-}
-
-void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) {
- channel->handleContent(body);
-}
-
-void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) {
- // TODO aconway 2007-01-17: Implement heartbeats.
-}
-
-
-bool BrokerAdapter::isOpen() const {
- return channel->isOpen();
-}
}} // namespace qpid::broker