diff options
| author | Alan Conway <aconway@apache.org> | 2008-03-18 21:31:08 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-03-18 21:31:08 +0000 |
| commit | 36e23bcefbf0a6893370cb041bd05a662f0b2758 (patch) | |
| tree | 601d29d88e873ac4d58da3cdb2753f02b64998bc /cpp/src/qpid/sys/AsynchIOAcceptor.cpp | |
| parent | eac0911169b24e708637572fe6b5a8283b3f49e0 (diff) | |
| download | qpid-python-36e23bcefbf0a6893370cb041bd05a662f0b2758.tar.gz | |
Make AsyncIOAcceptor multi-protocol:
- ConnectionCodec interface replaces ConnectionInputHandle, moves encoding/decoding out of AsyncIOAcceptor.
- ConnectionCodec::Factory replaces ConnectionInputHandlerFactory
- Acceptor creates version-specific ConnectionCodec based on protocol header.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@638590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 200 |
1 files changed, 57 insertions, 143 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index c24205f53e..56d7c6e1f3 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -27,12 +27,8 @@ #include "Thread.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/AMQDataBlock.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -40,6 +36,7 @@ #include <queue> #include <vector> #include <memory> +#include <ostream> namespace qpid { namespace sys { @@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor { public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} - void run(ConnectionInputHandlerFactory* factory); - ConnectionInputHandler* connect( - const std::string& host, int16_t port, - ConnectionInputHandlerFactory* factory); + void run(ConnectionCodec::Factory*); + void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); void shutdown(); @@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor { std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); + void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) { - return - Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : @@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { +class AsynchIOHandler : public OutputControl { AsynchIO* aio; - ConnectionInputHandler* inputHandler; - std::queue<framing::AMQFrame> frameQueue; - Mutex frameQueueLock; - bool frameQueueClosed; - bool isInitiated; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; bool readError; std::string identifier; bool isClient; - void write(const framing::AMQDataBlock&); + void write(const framing::ProtocolInitiation&); public: AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - isInitiated(false), + aio(0), + factory(0), + codec(0), readError(false), isClient(false) {} ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; + if (codec) + codec->closed(); + delete codec; } void setClient() { isClient = true; } - - void init(AsynchIO* a, ConnectionInputHandler* h) { + + void init(AsynchIO* a, ConnectionCodec::Factory* f) { aio = a; - inputHandler = h; + factory = f; identifier = aio->getSocket().getPeerAddress(); + } // Output side - void send(framing::AMQFrame&); void close(); void activateOutput(); - void initiated(const framing::ProtocolInitiation&); - // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void closedSocket(AsynchIO& aio, const Socket& s); }; -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s.getPeerAddress()); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); @@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { +void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { Dispatcher d(poller); AsynchAcceptor acceptor(listener, @@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { } } -ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +void AsynchIOAcceptor::connect( + const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); AsynchIOHandler* async = new AsynchIOHandler; async->setClient(); - ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress()); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); } aio->start(poller); - return handler; } @@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() { } -void AsynchIOHandler::write(const framing::AMQDataBlock& data) +void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { + QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")"); AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); if (!buff) buff = new Buff; @@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data) aio->queueWrite(buff); } -// Output side -void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { - ScopedLock<Mutex> l(frameQueueLock); - // Ignore anything seen after closing - if (!frameQueueClosed) - frameQueue.push(frame); - } - - // Activate aio for writing here - aio->notifyPendingWrite(); -} - -void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; -} - void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi) -{ - write(pi); -} - // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { return; } - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - if(isInitiated){ - framing::AMQFrame frame; - try{ - while(frame.decode(in)) { - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - inputHandler->received(frame); - } + size_t decoded = 0; + if (codec) { // Already initiated + try { + decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; aio->queueWriteClose(); } }else{ + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); framing::ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - QPID_LOG(debug, "INIT [" << identifier << "]"); - inputHandler->initiated(protocolInit); - isInitiated = true; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")"); + codec = factory->create(protocolInit.getVersion(), *this, identifier); + if (!codec) { + // FIXME aconway 2008-03-18: send valid version header & close connection. + // FIXME aconway 2008-03-18: exception type + throw Exception( + QPID_MSG("Protocol version not supported: " << protocolInit)); + } } } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { + if (decoded != size_t(buff->dataCount)) { // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); + buff->dataStart += decoded; + buff->dataCount -= decoded; aio->unread(buff); } else { // Give whole buffer back to aio subsystem @@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { void AsynchIOHandler::eof(AsynchIO&) { QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); + if (codec) codec->closed(); aio->queueWriteClose(); } @@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && !isInitiated) { - //get & write protocol header from upper layers - write(inputHandler->getInitiation()); - isInitiated = true; + if (isClient && codec == 0) { + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); return; } - ScopedLock<Mutex> l(frameQueueLock); - - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } - - do { + if (codec == 0) return; + while (codec->canEncode()) { // Try and get a queued buffer if not then construct new one AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; - - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - int framesEncoded=0; - while (frameSize <= int(out.available())) { - frameQueue.pop(); - - // Encode output frame - frame.encode(out); - ++framesEncoded; - buffUsed += frameSize; - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - - if (frameQueue.empty()) { - //if we have run out of frames, allow upper layers to - //generate more - if (!frameQueueClosed) { - inputHandler->doOutput(); - } - if (frameQueue.empty()) { - //if there are still no frames, we have no more to - //do - break; - } - } - frame = frameQueue.front(); - frameSize = frame.size(); - } - QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); - - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - - buff->dataCount = buffUsed; + if (!buff) buff = new Buff; + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; aio->queueWrite(buff); - } while (!frameQueue.empty()); - - if (frameQueueClosed) { - aio->queueWriteClose(); } + if (codec->isClosed()) + aio->queueWriteClose(); } }} // namespace qpid::sys |
