diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-15 21:56:23 +0000 |
| commit | ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08 (patch) | |
| tree | 3b69ec6c589ff8edd628f2e218589180cbca005b /cpp/lib/broker | |
| parent | 5aaad510dc978dc09f92c774c81255b7af6b8b68 (diff) | |
| download | qpid-python-ef1469a7ea1f54f266aee8f2899b7cd0c7e07d08.tar.gz | |
* Client & broker using Requester/Responder to manage request/response IDs.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496511 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker')
| -rw-r--r-- | cpp/lib/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | cpp/lib/broker/Broker.h | 13 | ||||
| -rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 257 | ||||
| -rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.h | 35 |
4 files changed, 184 insertions, 123 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6a8b1f8538..c2117eaf23 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -23,6 +23,7 @@ #include "AMQFrame.h" #include "DirectExchange.h" +#include "TopicExchange.h" #include "FanOutExchange.h" #include "HeadersExchange.h" #include "MessageStoreModule.h" @@ -102,3 +103,4 @@ const int16_t Broker::DEFAULT_PORT(5672); }} // namespace qpid::broker + diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index f831b680e9..ad7fbb1eca 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -29,6 +29,15 @@ #include <SharedObject.h> #include <MessageStore.h> #include <AutoDelete.h> +#include "Requester.h" +#include "Responder.h" +#include <ExchangeRegistry.h> +#include <BrokerChannel.h> +#include <ConnectionToken.h> +#include <DirectExchange.h> +#include <OutputHandler.h> +#include <ProtocolInitiation.h> +#include <QueueRegistry.h> namespace qpid { namespace broker { @@ -77,6 +86,8 @@ class Broker : public qpid::sys::Runnable, u_int32_t getTimeout() { return timeout; } u_int64_t getStagingThreshold() { return stagingThreshold; } AutoDelete& getCleaner() { return cleaner; } + qpid::framing::Requester& getRequester() { return requester; } + qpid::framing::Responder& getResponder() { return responder; } private: Broker(const Configuration& config); @@ -89,6 +100,8 @@ class Broker : public qpid::sys::Runnable, u_int64_t stagingThreshold; AutoDelete cleaner; SessionHandlerFactoryImpl factory; + qpid::framing::Requester requester; + qpid::framing::Responder responder; }; }} diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index 905ac83b92..d7f6320535 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -19,11 +19,15 @@ * */ #include <iostream> -#include <SessionHandlerImpl.h> -#include <FanOutExchange.h> -#include <HeadersExchange.h> -#include <TopicExchange.h> -#include "assert.h" +#include <assert.h> + +#include "SessionHandlerImpl.h" + +#include "FanOutExchange.h" +#include "HeadersExchange.h" + +#include "Requester.h" +#include "Responder.h" using namespace boost; using namespace qpid::sys; @@ -42,6 +46,8 @@ SessionHandlerImpl::SessionHandlerImpl( exchanges(broker.getExchanges()), cleaner(broker.getCleaner()), settings(broker.getTimeout(), broker.getStagingThreshold()), + requester(broker.getRequester()), + responder(broker.getResponder()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -55,7 +61,7 @@ SessionHandlerImpl::SessionHandlerImpl( SessionHandlerImpl::~SessionHandlerImpl(){ - if (client != NULL) + if (client != NULL) delete client; } @@ -87,51 +93,87 @@ Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ return exchanges.get(name); } +void SessionHandlerImpl::handleMethod( + u_int16_t channel, qpid::framing::AMQBody::shared_ptr body) +{ + AMQMethodBody::shared_ptr method = + shared_polymorphic_cast<AMQMethodBody, AMQBody>(body); + try{ + method->invoke(*this, channel); + }catch(ChannelException& e){ + channels[channel]->close(); + channels.erase(channel); + client->getChannel().close( + channel, e.code, e.text, + method->amqpClassId(), method->amqpMethodId()); + }catch(ConnectionException& e){ + client->getConnection().close( + 0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); + }catch(std::exception& e){ + client->getConnection().close( + 0, 541/*internal error*/, e.what(), + method->amqpClassId(), method->amqpMethodId()); + } +} + void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ u_int16_t channel = frame->getChannel(); AMQBody::shared_ptr body = frame->getBody(); - AMQMethodBody::shared_ptr method; - switch(body->type()) { case REQUEST_BODY: - // responder.received(frame); + responder.received(AMQRequestBody::getData(body)); + handleMethod(channel, body); + break; case RESPONSE_BODY: - // requester.received(frame); - case METHOD_BODY: // - method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client->getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client->getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - string error(e.what()); - client->getConnection().close(0, 541/*internal error*/, error, method->amqpClassId(), method->amqpMethodId()); - } - break; - - case HEADER_BODY: - this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + // Must process responses before marking them received. + handleMethod(channel, body); + requester.processed(AMQResponseBody::getData(body)); + break; + // TODO aconway 2007-01-15: Leftover from 0-8 support, remove. + case METHOD_BODY: + handleMethod(channel, body); + break; + case HEADER_BODY: + handleHeader( + channel, shared_polymorphic_cast<AMQHeaderBody>(body)); break; - case CONTENT_BODY: - this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + case CONTENT_BODY: + handleContent( + channel, shared_polymorphic_cast<AMQContentBody>(body)); break; - case HEARTBEAT_BODY: - //channel must be 0 - this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + case HEARTBEAT_BODY: + assert(channel == 0); + handleHeartbeat( + shared_polymorphic_cast<AMQHeartbeatBody>(body)); break; } } +/** + * An OutputHandler that does request/response procssing before + * delgating to another OutputHandler. + */ +SessionHandlerImpl::Sender::Sender( + OutputHandler& oh, Requester& req, Responder& resp) + : out(oh), requester(req), responder(resp) +{} + +void SessionHandlerImpl::Sender::send(AMQFrame* frame) { + AMQBody::shared_ptr body = frame->getBody(); + u_int16_t type = body->type(); + if (type == REQUEST_BODY) + requester.sending(AMQRequestBody::getData(body)); + else if (type == RESPONSE_BODY) + responder.sending(AMQResponseBody::getData(body)); + out.send(frame); +} + void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){ - if (client == NULL) + if (client == 0) { client = new qpid::framing::AMQP_ClientProxy(context, header->getMajor(), header->getMinor()); @@ -280,7 +322,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } @@ -335,9 +377,9 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t Queue::shared_ptr queue = parent->getQueue(queueName, channel); Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ -// kpvdr - cannot use this any longer as routingKey is now const -// if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); -// exchange->bind(queue, routingKey, &arguments); + // kpvdr - cannot use this any longer as routingKey is now const + // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); + // exchange->bind(queue, routingKey, &arguments); string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); if(!nowait) parent->client->getQueue().bindOk(channel); @@ -483,25 +525,25 @@ SessionHandlerImpl::QueueHandlerImpl::unbind( const string& /*routingKey*/, const qpid::framing::FieldTable& /*arguments*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::ping( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void SessionHandlerImpl::ChannelHandlerImpl::pong( u_int16_t /*channel*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } void @@ -509,148 +551,149 @@ SessionHandlerImpl::ChannelHandlerImpl::resume( u_int16_t /*channel*/, const string& /*channelId*/ ) { - assert(0); // FIXME aconway 2007-01-04: 0-9 feature + assert(0); // FIXME aconway 2007-01-04: 0-9 feature } // Message class method handlers void SessionHandlerImpl::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) + const string& /*reference*/, + const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::cancel( u_int16_t /*channel*/, - const string& /*destination*/ ) + const string& /*destination*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::consume( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noLocal*/, - bool /*noAck*/, - bool /*exclusive*/, - const qpid::framing::FieldTable& /*filter*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noLocal*/, + bool /*noAck*/, + bool /*exclusive*/, + const qpid::framing::FieldTable& /*filter*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::get( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*queue*/, - const string& /*destination*/, - bool /*noAck*/ ) + u_int16_t /*ticket*/, + const string& /*queue*/, + const string& /*destination*/, + bool /*noAck*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) + u_int64_t /*value*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) + const string& /*reference*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::qos( u_int16_t /*channel*/, - u_int32_t /*prefetchSize*/, - u_int16_t /*prefetchCount*/, - bool /*global*/ ) + u_int32_t /*prefetchSize*/, + u_int16_t /*prefetchCount*/, + bool /*global*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::recover( u_int16_t /*channel*/, - bool /*requeue*/ ) + bool /*requeue*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) + u_int16_t /*code*/, + const string& /*text*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) + const string& /*reference*/, + const string& /*identifier*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } void SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool /*immediate*/, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& /*exchange*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) + u_int16_t /*ticket*/, + const string& /*destination*/, + bool /*redelivered*/, + bool /*immediate*/, + u_int64_t /*ttl*/, + u_int8_t /*priority*/, + u_int64_t /*timestamp*/, + u_int8_t /*deliveryMode*/, + u_int64_t /*expiration*/, + const string& /*exchange*/, + const string& /*routingKey*/, + const string& /*messageId*/, + const string& /*correlationId*/, + const string& /*replyTo*/, + const string& /*contentType*/, + const string& /*contentEncoding*/, + const string& /*userId*/, + const string& /*appId*/, + const string& /*transactionId*/, + const string& /*securityToken*/, + const qpid::framing::FieldTable& /*applicationHeaders*/, + qpid::framing::Content /*body*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } }} + diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 08b05a11b6..070bd1266e 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -24,28 +24,20 @@ #include <map> #include <sstream> #include <vector> -#include <exception> + #include <AMQFrame.h> #include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> -#include <AutoDelete.h> -#include <ExchangeRegistry.h> -#include <BrokerChannel.h> -#include <ConnectionToken.h> -#include <DirectExchange.h> -#include <OutputHandler.h> -#include <ProtocolInitiation.h> -#include <QueueRegistry.h> #include <sys/SessionContext.h> #include <sys/SessionHandler.h> #include <sys/TimeoutHandler.h> -#include <TopicExchange.h> #include "Broker.h" +#include "Exception.h" namespace qpid { namespace broker { -struct ChannelException : public std::exception { +struct ChannelException : public qpid::Exception { u_int16_t code; string text; ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} @@ -53,7 +45,7 @@ struct ChannelException : public std::exception { const char* what() const throw() { return text.c_str(); } }; -struct ConnectionException : public std::exception { +struct ConnectionException : public qpid::Exception { u_int16_t code; string text; ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} @@ -75,13 +67,25 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, { typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - + class Sender : public qpid::framing::OutputHandler { + public: + Sender(qpid::framing::OutputHandler&, + qpid::framing::Requester&, qpid::framing::Responder&); + void send(qpid::framing::AMQFrame* frame); + private: + OutputHandler& out; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; + }; + qpid::sys::SessionContext* context; qpid::framing::AMQP_ClientProxy* client; QueueRegistry& queues; ExchangeRegistry& exchanges; AutoDelete& cleaner; Settings settings; + qpid::framing::Requester& requester; + qpid::framing::Responder& responder; std::auto_ptr<BasicHandler> basicHandler; std::auto_ptr<ChannelHandler> channelHandler; std::auto_ptr<ConnectionHandler> connectionHandler; @@ -98,6 +102,7 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleMethod(u_int16_t channel, qpid::framing::AMQBody::shared_ptr body); void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); Channel* getChannel(u_int16_t channel); @@ -371,8 +376,6 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); } }; -} -} - +}} #endif |
