diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-02 22:03:10 +0000 |
| commit | b5c270f10496f522ef6a03a8fa60f85d55c9187d (patch) | |
| tree | 714e7abf7ba591d00232d821440e51461175cb9e /cpp/lib/broker/BrokerAdapter.cpp | |
| parent | 750f272ac99e8c830807affb3ae68ab0beeca63f (diff) | |
| download | qpid-python-b5c270f10496f522ef6a03a8fa60f85d55c9187d.tar.gz | |
* cpp/lib/common/framing/MethodContext.h: Reduced MethodContext to
ChannelAdapter and Method Body. Request ID comes from body,
ChannelAdapter is used to send frames, not OutputHandler.
* cpp/lib/common/framing/ChannelAdapter.h,.cpp: Removed context member.
Context is per-method not per-channel.
* cpp/lib/broker/*: Replace direct use of OutputHandler and ChannelId
with MethodContext (for responses) or ChannelAdapter (for requests.)
Use context request-ID to construct responses, send all bodies via
ChannelAdapter.
* cpp/lib/broker/BrokerAdapter.cpp: Link broker::Channel to BrokerAdapter.
* cpp/lib/broker/*: Remove unnecessary ProtocolVersion parameters.
Fix bogus signatures: ProtocolVersion* -> const ProtocolVersion&
* Cosmetic changes, many files:
- fixed indentation, broke long lines.
- removed unnecessary qpid:: prefixes.
* broker/BrokerAdapter,BrokerChannel: Merged BrokerAdapter into
broker::channel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@502767 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerAdapter.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 300 |
1 files changed, 53 insertions, 247 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 73ece8b264..10e386ff41 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -28,168 +28,20 @@ namespace broker { using namespace qpid; using namespace qpid::framing; -typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; +typedef std::vector<Queue::shared_ptr> QueueVector; -class BrokerAdapter::ServerOps : public AMQP_ServerOperations -{ - public: - ServerOps(Channel& ch, Connection& c, Broker& b) : - basicHandler(ch, c, b), - channelHandler(ch, c, b), - connectionHandler(ch, c, b), - exchangeHandler(ch, c, b), - messageHandler(ch, c, b), - queueHandler(ch, c, b), - txHandler(ch, c, b) - {} - - ChannelHandler* getChannelHandler() { return &channelHandler; } - ConnectionHandler* getConnectionHandler() { return &connectionHandler; } - BasicHandler* getBasicHandler() { return &basicHandler; } - ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } - QueueHandler* getQueueHandler() { return &queueHandler; } - TxHandler* getTxHandler() { return &txHandler; } - MessageHandler* getMessageHandler() { return &messageHandler; } - AccessHandler* getAccessHandler() { - throw ConnectionException(540, "Access class not implemented"); } - FileHandler* getFileHandler() { - throw ConnectionException(540, "File class not implemented"); } - StreamHandler* getStreamHandler() { - throw ConnectionException(540, "Stream class not implemented"); } - DtxHandler* getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); } - TunnelHandler* getTunnelHandler() { - throw ConnectionException(540, "Tunnel class not implemented"); } - - private: - struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} - - Channel& channel; - Connection& connection; - Broker& broker; - }; - - class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { - public: - ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - - void startOk(const MethodContext& context, - const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const MethodContext& context, const std::string& response); - void tuneOk(const MethodContext& context, u_int16_t channelMax, - u_int32_t frameMax, u_int16_t heartbeat); - void open(const MethodContext& context, const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(const MethodContext& context, u_int16_t replyCode, - const std::string& replyText, - u_int16_t classId, u_int16_t methodId); - void closeOk(const MethodContext& context); - }; - - class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ - public: - ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void open(const MethodContext& context, const std::string& outOfBand); - void flow(const MethodContext& context, bool active); - void flowOk(const MethodContext& context, bool active); - void ok( const MethodContext& context ); - void ping( const MethodContext& context ); - void pong( const MethodContext& context ); - void resume( const MethodContext& context, const std::string& channelId ); - void close(const MethodContext& context, u_int16_t replyCode, const - std::string& replyText, u_int16_t classId, u_int16_t methodId); - void closeOk(const MethodContext& context); - }; - - class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ - public: - ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - void delete_(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); - }; - - class QueueHandlerImpl : private CoreRefs, public QueueHandler{ - public: - QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void declare(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, - const qpid::framing::FieldTable& arguments); - void bind(const MethodContext& context, u_int16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(const MethodContext& context, - u_int16_t ticket, - const std::string& queue, - const std::string& exchange, - const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - void purge(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool nowait); - void delete_(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty, - bool nowait); - }; - - class BasicHandlerImpl : private CoreRefs, public BasicHandler{ - public: - BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void qos(const MethodContext& context, u_int32_t prefetchSize, - u_int16_t prefetchCount, bool global); - void consume( - const MethodContext& context, u_int16_t ticket, const std::string& queue, - const std::string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(const MethodContext& context, const std::string& consumerTag, - bool nowait); - void publish(const MethodContext& context, u_int16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); - void get(const MethodContext& context, u_int16_t ticket, const std::string& queue, - bool noAck); - void ack(const MethodContext& context, u_int64_t deliveryTag, bool multiple); - void reject(const MethodContext& context, u_int64_t deliveryTag, bool requeue); - void recover(const MethodContext& context, bool requeue); - }; - - class TxHandlerImpl : private CoreRefs, public TxHandler{ - public: - TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} - void select(const MethodContext& context); - void commit(const MethodContext& context); - void rollback(const MethodContext& context); - }; - - BasicHandlerImpl basicHandler; - ChannelHandlerImpl channelHandler; - ConnectionHandlerImpl connectionHandler; - ExchangeHandlerImpl exchangeHandler; - MessageHandlerImpl messageHandler; - QueueHandlerImpl queueHandler; - TxHandlerImpl txHandler; - -}; - -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, const string& /*mechanism*/, +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext& context , const FieldTable& /*clientProperties*/, + const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/){ connection.client->getConnection().tune( context, 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::secureOk(const MethodContext&, const string& /*response*/){} +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( + const MethodContext&, const string& /*response*/){} -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -197,12 +49,12 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ string knownhosts; connection.client->getConnection().openOk(context, knownhosts); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { @@ -210,21 +62,21 @@ void BrokerAdapter::ServerOps::ConnectionHandlerImpl::close( connection.getOutput().close(); } -void BrokerAdapter::ServerOps::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::open( +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 connection.client->getChannel().openOk(context, std::string()/* ID */); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ServerOps::ChannelHandlerImpl::close( +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) @@ -234,13 +86,13 @@ void BrokerAdapter::ServerOps::ChannelHandlerImpl::close( connection.closeChannel(channel.getId()); } -void BrokerAdapter::ServerOps::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -265,17 +117,17 @@ void BrokerAdapter::ServerOps::ExchangeHandlerImpl::declare(const MethodContext& } } -void BrokerAdapter::ServerOps::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); if(!nowait) connection.client->getExchange().deleteOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); @@ -308,9 +160,9 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::declare(const MethodContext& co } } -void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); @@ -325,7 +177,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::bind(const MethodContext& conte } void -BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( +BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -344,15 +196,15 @@ BrokerAdapter::ServerOps::QueueHandlerImpl::unbind( connection.client->getQueue().unbindOk(context); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); if(!nowait) connection.client->getQueue().purgeOk(context, count); } -void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -363,7 +215,7 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co }else{ //remove the queue from the list of exclusive queues if necessary if(q->isExclusiveOwner(&connection)){ - queue_iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); + QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); } count = q->getMessageCount(); @@ -377,14 +229,14 @@ void BrokerAdapter::ServerOps::QueueHandlerImpl::delete_(const MethodContext& co -void BrokerAdapter::ServerOps::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); connection.client->getBasic().qosOk(context); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -412,19 +264,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::consume( } -void BrokerAdapter::ServerOps::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); } -void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u_int16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( + const MethodContext& context, u_int16_t /*ticket*/, + const string& exchangeName, const string& routingKey, + bool mandatory, bool immediate) +{ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage( + &connection, exchangeName, routingKey, mandatory, immediate, + context.methodBody); channel.handlePublish(msg, exchange); }else{ throw ChannelException( @@ -432,7 +288,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::publish(const MethodContext&, u } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -441,7 +297,7 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::get(const MethodContext& contex } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -449,23 +305,23 @@ void BrokerAdapter::ServerOps::BasicHandlerImpl::ack(const MethodContext&, u_int } } -void BrokerAdapter::ServerOps::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::ServerOps::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::ServerOps::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); connection.client->getTx().selectOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); connection.client->getTx().commitOk(context); } -void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); connection.client->getTx().rollbackOk(context); @@ -473,82 +329,32 @@ void BrokerAdapter::ServerOps::TxHandlerImpl::rollback(const MethodContext& cont } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { connection.client->getChannel().ok(context); connection.client->getChannel().pong(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { connection.client->getChannel().ok(context); } void -BrokerAdapter::ServerOps::ChannelHandlerImpl::resume( +BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { assert(0); // FIXME aconway 2007-01-04: 0-9 feature } -BrokerAdapter::BrokerAdapter( - std::auto_ptr<Channel> ch, Connection& c, Broker& b -) : - channel(ch), - connection(c), - broker(b), - serverOps(new ServerOps(*channel,c,b)) -{ - init(channel->getId(), c.getOutput(), channel->getVersion()); -} - -void BrokerAdapter::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - method->invoke(*serverOps, context); - }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.client->getConnection().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.client->getConnection().close( - context, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} - -void BrokerAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { - channel->handleHeader(body); -} - -void BrokerAdapter::handleContent(AMQContentBody::shared_ptr body) { - channel->handleContent(body); -} - -void BrokerAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { - // TODO aconway 2007-01-17: Implement heartbeats. -} - - -bool BrokerAdapter::isOpen() const { - return channel->isOpen(); -} }} // namespace qpid::broker |
