diff options
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 200 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionCodec.h | 80 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionInputHandler.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandler.h | 3 |
5 files changed, 142 insertions, 156 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 5eb1f1a500..1e7827e60c 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -24,13 +24,11 @@ #include <stdint.h> #include "qpid/SharedObject.h" +#include "ConnectionCodec.h" namespace qpid { namespace sys { -class ConnectionInputHandlerFactory; -class ConnectionInputHandler; - class Acceptor : public qpid::SharedObject<Acceptor> { public: @@ -38,10 +36,9 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionInputHandlerFactory* factory) = 0; - virtual ConnectionInputHandler* connect( - const std::string& host, int16_t port, - ConnectionInputHandlerFactory* factory) = 0; + virtual void run(ConnectionCodec::Factory*) = 0; + virtual void connect( + const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; /** Note: this function is async-signal safe */ virtual void shutdown() = 0; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index c24205f53e..56d7c6e1f3 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -27,12 +27,8 @@ #include "Thread.h" #include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/AMQDataBlock.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -40,6 +36,7 @@ #include <queue> #include <vector> #include <memory> +#include <ostream> namespace qpid { namespace sys { @@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor { public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} - void run(ConnectionInputHandlerFactory* factory); - ConnectionInputHandler* connect( - const std::string& host, int16_t port, - ConnectionInputHandlerFactory* factory); + void run(ConnectionCodec::Factory*); + void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); void shutdown(); @@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor { std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); + void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) { - return - Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : @@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { +class AsynchIOHandler : public OutputControl { AsynchIO* aio; - ConnectionInputHandler* inputHandler; - std::queue<framing::AMQFrame> frameQueue; - Mutex frameQueueLock; - bool frameQueueClosed; - bool isInitiated; + ConnectionCodec::Factory* factory; + ConnectionCodec* codec; bool readError; std::string identifier; bool isClient; - void write(const framing::AMQDataBlock&); + void write(const framing::ProtocolInitiation&); public: AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - isInitiated(false), + aio(0), + factory(0), + codec(0), readError(false), isClient(false) {} ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; + if (codec) + codec->closed(); + delete codec; } void setClient() { isClient = true; } - - void init(AsynchIO* a, ConnectionInputHandler* h) { + + void init(AsynchIO* a, ConnectionCodec::Factory* f) { aio = a; - inputHandler = h; + factory = f; identifier = aio->getSocket().getPeerAddress(); + } // Output side - void send(framing::AMQFrame&); void close(); void activateOutput(); - void initiated(const framing::ProtocolInitiation&); - // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void closedSocket(AsynchIO& aio, const Socket& s); }; -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s.getPeerAddress()); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); @@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { +void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { Dispatcher d(poller); AsynchAcceptor acceptor(listener, @@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { } } -ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +void AsynchIOAcceptor::connect( + const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); AsynchIOHandler* async = new AsynchIOHandler; async->setClient(); - ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress()); AsynchIO* aio = new AsynchIO(*socket, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); - + async->init(aio, f); // Give connection some buffers to use for (int i = 0; i < 4; i++) { aio->queueReadBuffer(new Buff); } aio->start(poller); - return handler; } @@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() { } -void AsynchIOHandler::write(const framing::AMQDataBlock& data) +void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { + QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")"); AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); if (!buff) buff = new Buff; @@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data) aio->queueWrite(buff); } -// Output side -void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { - ScopedLock<Mutex> l(frameQueueLock); - // Ignore anything seen after closing - if (!frameQueueClosed) - frameQueue.push(frame); - } - - // Activate aio for writing here - aio->notifyPendingWrite(); -} - -void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; -} - void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi) -{ - write(pi); -} - // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { return; } - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - if(isInitiated){ - framing::AMQFrame frame; - try{ - while(frame.decode(in)) { - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - inputHandler->received(frame); - } + size_t decoded = 0; + if (codec) { // Already initiated + try { + decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; aio->queueWriteClose(); } }else{ + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); framing::ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - QPID_LOG(debug, "INIT [" << identifier << "]"); - inputHandler->initiated(protocolInit); - isInitiated = true; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")"); + codec = factory->create(protocolInit.getVersion(), *this, identifier); + if (!codec) { + // FIXME aconway 2008-03-18: send valid version header & close connection. + // FIXME aconway 2008-03-18: exception type + throw Exception( + QPID_MSG("Protocol version not supported: " << protocolInit)); + } } } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will - if (in.available() != 0) { + if (decoded != size_t(buff->dataCount)) { // Adjust buffer for used bytes and then "unread them" - buff->dataStart += buff->dataCount-in.available(); - buff->dataCount = in.available(); + buff->dataStart += decoded; + buff->dataCount -= decoded; aio->unread(buff); } else { // Give whole buffer back to aio subsystem @@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { void AsynchIOHandler::eof(AsynchIO&) { QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); + if (codec) codec->closed(); aio->queueWriteClose(); } @@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && !isInitiated) { - //get & write protocol header from upper layers - write(inputHandler->getInitiation()); - isInitiated = true; + if (isClient && codec == 0) { + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); return; } - ScopedLock<Mutex> l(frameQueueLock); - - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } - - do { + if (codec == 0) return; + while (codec->canEncode()) { // Try and get a queued buffer if not then construct new one AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; - - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - int framesEncoded=0; - while (frameSize <= int(out.available())) { - frameQueue.pop(); - - // Encode output frame - frame.encode(out); - ++framesEncoded; - buffUsed += frameSize; - QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - - if (frameQueue.empty()) { - //if we have run out of frames, allow upper layers to - //generate more - if (!frameQueueClosed) { - inputHandler->doOutput(); - } - if (frameQueue.empty()) { - //if there are still no frames, we have no more to - //do - break; - } - } - frame = frameQueue.front(); - frameSize = frame.size(); - } - QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); - - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - - buff->dataCount = buffUsed; + if (!buff) buff = new Buff; + size_t encoded=codec->encode(buff->bytes, buff->byteCount); + buff->dataCount = encoded; aio->queueWrite(buff); - } while (!frameQueue.empty()); - - if (frameQueueClosed) { - aio->queueWriteClose(); } + if (codec->isClosed()) + aio->queueWriteClose(); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h new file mode 100644 index 0000000000..205596c709 --- /dev/null +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -0,0 +1,80 @@ +#ifndef QPID_SYS_CONNECTION_CODEC_H +#define QPID_SYS_CONNECTION_CODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/framing/ProtocolVersion.h" +#include "OutputControl.h" +#include <memory> +#include <map> + +namespace qpid { + +namespace broker { class Broker; } + +namespace sys { + +/** + * Interface of coder/decoder for a connection of a specific protocol + * version. + */ +class ConnectionCodec { + public: + virtual ~ConnectionCodec() {} + + /** Decode from buffer, return number of bytes decoded. + * @return may be less than size if there was incomplete + * data at the end of the buffer. + */ + virtual size_t decode(const char* buffer, size_t size) = 0; + + + /** Encode into buffer, return number of bytes encoded */ + virtual size_t encode(const char* buffer, size_t size) = 0; + + /** Return true if we have data to encode */ + virtual bool canEncode() = 0; + + /** Network connection was closed from other end. */ + virtual void closed() = 0; + + virtual bool isClosed() const = 0; + + virtual framing::ProtocolVersion getVersion() const = 0; + + struct Factory { + virtual ~Factory() {} + + /** Return 0 if version unknown */ + virtual ConnectionCodec* create( + framing::ProtocolVersion, OutputControl&, const std::string& id + ) = 0; + + /** Return "preferred" codec for outbound connections. */ + virtual ConnectionCodec* create( + OutputControl&, const std::string& id + ) = 0; + }; +}; + +}} // namespace qpid::sys + +#endif /*!QPID_SYS_CONNECTION_CODEC_H*/ diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h index 1936b5ec50..a2c18d6d9a 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -22,8 +22,6 @@ #define _ConnectionInputHandler_ #include "qpid/framing/InputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" #include "OutputTask.h" #include "TimeoutHandler.h" @@ -31,12 +29,10 @@ namespace qpid { namespace sys { class ConnectionInputHandler : - public qpid::framing::InitiationHandler, public qpid::framing::InputHandler, public TimeoutHandler, public OutputTask { public: - virtual qpid::framing::ProtocolInitiation getInitiation() = 0; virtual void closed() = 0; }; diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h index 13407d9b9d..5a60ae4998 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,7 +22,6 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" #include "OutputControl.h" namespace qpid { @@ -31,7 +30,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl { public: virtual void close() = 0; |
