summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-03-18 21:31:08 +0000
committerAlan Conway <aconway@apache.org>2008-03-18 21:31:08 +0000
commit36e23bcefbf0a6893370cb041bd05a662f0b2758 (patch)
tree601d29d88e873ac4d58da3cdb2753f02b64998bc /cpp/src/qpid/sys/AsynchIOAcceptor.cpp
parenteac0911169b24e708637572fe6b5a8283b3f49e0 (diff)
downloadqpid-python-36e23bcefbf0a6893370cb041bd05a662f0b2758.tar.gz
Make AsyncIOAcceptor multi-protocol:
- ConnectionCodec interface replaces ConnectionInputHandle, moves encoding/decoding out of AsyncIOAcceptor. - ConnectionCodec::Factory replaces ConnectionInputHandlerFactory - Acceptor creates version-specific ConnectionCodec based on protocol header. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@638590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp200
1 files changed, 57 insertions, 143 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index c24205f53e..56d7c6e1f3 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -27,12 +27,8 @@
#include "Thread.h"
#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/AMQDataBlock.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -40,6 +36,7 @@
#include <queue>
#include <vector>
#include <memory>
+#include <ostream>
namespace qpid {
namespace sys {
@@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor {
public:
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
- void run(ConnectionInputHandlerFactory* factory);
- ConnectionInputHandler* connect(
- const std::string& host, int16_t port,
- ConnectionInputHandlerFactory* factory);
+ void run(ConnectionCodec::Factory*);
+ void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
void shutdown();
@@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor {
std::string getHost() const;
private:
- void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
{
- return
- Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
}
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
@@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+class AsynchIOHandler : public OutputControl {
AsynchIO* aio;
- ConnectionInputHandler* inputHandler;
- std::queue<framing::AMQFrame> frameQueue;
- Mutex frameQueueLock;
- bool frameQueueClosed;
- bool isInitiated;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
bool readError;
std::string identifier;
bool isClient;
- void write(const framing::AMQDataBlock&);
+ void write(const framing::ProtocolInitiation&);
public:
AsynchIOHandler() :
- inputHandler(0),
- frameQueueClosed(false),
- isInitiated(false),
+ aio(0),
+ factory(0),
+ codec(0),
readError(false),
isClient(false)
{}
~AsynchIOHandler() {
- if (inputHandler)
- inputHandler->closed();
- delete inputHandler;
+ if (codec)
+ codec->closed();
+ delete codec;
}
void setClient() { isClient = true; }
-
- void init(AsynchIO* a, ConnectionInputHandler* h) {
+
+ void init(AsynchIO* a, ConnectionCodec::Factory* f) {
aio = a;
- inputHandler = h;
+ factory = f;
identifier = aio->getSocket().getPeerAddress();
+
}
// Output side
- void send(framing::AMQFrame&);
void close();
void activateOutput();
- void initiated(const framing::ProtocolInitiation&);
-
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
void closedSocket(AsynchIO& aio, const Socket& s);
};
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
-
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s.getPeerAddress());
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
@@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
Dispatcher d(poller);
AsynchAcceptor
acceptor(listener,
@@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
}
}
-ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+void AsynchIOAcceptor::connect(
+ const std::string& host, int16_t port, ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
AsynchIOHandler* async = new AsynchIOHandler;
async->setClient();
- ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress());
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
- return handler;
}
@@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() {
}
-void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
+ QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (!buff)
buff = new Buff;
@@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data)
aio->queueWrite(buff);
}
-// Output side
-void AsynchIOHandler::send(framing::AMQFrame& frame) {
- // TODO: Need to find out if we are in the callback context,
- // in the callback thread if so we can go further than just queuing the frame
- // to be handled later
- {
- ScopedLock<Mutex> l(frameQueueLock);
- // Ignore anything seen after closing
- if (!frameQueueClosed)
- frameQueue.push(frame);
- }
-
- // Activate aio for writing here
- aio->notifyPendingWrite();
-}
-
-void AsynchIOHandler::close() {
- ScopedLock<Mutex> l(frameQueueLock);
- frameQueueClosed = true;
-}
-
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
-{
- write(pi);
-}
-
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- if(isInitiated){
- framing::AMQFrame frame;
- try{
- while(frame.decode(in)) {
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- inputHandler->received(frame);
- }
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
aio->queueWriteClose();
}
}else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- QPID_LOG(debug, "INIT [" << identifier << "]");
- inputHandler->initiated(protocolInit);
- isInitiated = true;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")");
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+ if (!codec) {
+ // FIXME aconway 2008-03-18: send valid version header & close connection.
+ // FIXME aconway 2008-03-18: exception type
+ throw Exception(
+ QPID_MSG("Protocol version not supported: " << protocolInit));
+ }
}
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
+ if (decoded != size_t(buff->dataCount)) {
// Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
aio->unread(buff);
} else {
// Give whole buffer back to aio subsystem
@@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
void AsynchIOHandler::eof(AsynchIO&) {
QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- inputHandler->closed();
+ if (codec) codec->closed();
aio->queueWriteClose();
}
@@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- if (isClient && !isInitiated) {
- //get & write protocol header from upper layers
- write(inputHandler->getInitiation());
- isInitiated = true;
+ if (isClient && codec == 0) {
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
- ScopedLock<Mutex> l(frameQueueLock);
-
- if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so tell the input handler to queue any available output:
- inputHandler->doOutput();
- //if still no frames, theres nothing to do:
- if (frameQueue.empty()) return;
- }
-
- do {
+ if (codec == 0) return;
+ while (codec->canEncode()) {
// Try and get a queued buffer if not then construct new one
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
-
- framing::AMQFrame frame = frameQueue.front();
- int frameSize = frame.size();
- int framesEncoded=0;
- while (frameSize <= int(out.available())) {
- frameQueue.pop();
-
- // Encode output frame
- frame.encode(out);
- ++framesEncoded;
- buffUsed += frameSize;
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
-
- if (frameQueue.empty()) {
- //if we have run out of frames, allow upper layers to
- //generate more
- if (!frameQueueClosed) {
- inputHandler->doOutput();
- }
- if (frameQueue.empty()) {
- //if there are still no frames, we have no more to
- //do
- break;
- }
- }
- frame = frameQueue.front();
- frameSize = frame.size();
- }
- QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames ");
-
- // If frame was egregiously large complain
- if (frameSize > buff->byteCount)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
-
- buff->dataCount = buffUsed;
+ if (!buff) buff = new Buff;
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
aio->queueWrite(buff);
- } while (!frameQueue.empty());
-
- if (frameQueueClosed) {
- aio->queueWriteClose();
}
+ if (codec->isClosed())
+ aio->queueWriteClose();
}
}} // namespace qpid::sys