diff options
| author | Gordon Sim <gsim@apache.org> | 2008-02-01 18:21:01 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-02-01 18:21:01 +0000 |
| commit | 5891c19a838bd8987fbc04d23923f4f5f2ca4636 (patch) | |
| tree | 1b8b75e076ebded9b57c84b547b8cf9b80a71427 /cpp/src/qpid/sys | |
| parent | 4db96f7ad47c69982cdc6cf7b5e5c47b00f1144b (diff) | |
| download | qpid-python-5891c19a838bd8987fbc04d23923f4f5f2ca4636.tar.gz | |
Initial cut of inter-broker bridging
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionInputHandler.h | 1 |
3 files changed, 54 insertions, 1 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 2eee8b4abd..1e87b76e04 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -38,6 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; virtual void run(ConnectionInputHandlerFactory* factory) = 0; + virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0; virtual void shutdown() = 0; }; diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 485f8c20f4..650bb31a68 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -30,6 +30,7 @@ #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/AMQDataBlock.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" @@ -53,6 +54,7 @@ class AsynchIOAcceptor : public Acceptor { AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); + void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory); void shutdown(); uint16_t getPort() const; @@ -92,13 +94,17 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { bool initiated; bool readError; std::string identifier; + bool isClient; + + void write(const framing::AMQDataBlock&); public: AsynchIOHandler() : inputHandler(0), frameQueueClosed(false), initiated(false), - readError(false) + readError(false), + isClient(false) {} ~AsynchIOHandler() { @@ -107,6 +113,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { delete inputHandler; } + void setClient() { isClient = true; } + void init(AsynchIO* a, ConnectionInputHandler* h) { aio = a; inputHandler = h; @@ -179,11 +187,48 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { t[i].join(); } } + +void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f) +{ + Socket* socket = new Socket();//Should be deleted by handle when socket closes + socket->connect(host, port); + AsynchIOHandler* async = new AsynchIOHandler; + async->setClient(); + ConnectionInputHandler* handler = f->create(async, *socket); + AsynchIO* aio = new AsynchIO(*socket, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); + + // Give connection some buffers to use + for (int i = 0; i < 4; i++) { + aio->queueReadBuffer(new Buff); + } + aio->start(poller); + +} + void AsynchIOAcceptor::shutdown() { poller->shutdown(); } + +void AsynchIOHandler::write(const framing::AMQDataBlock& data) +{ + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + // Output side void AsynchIOHandler::send(framing::AMQFrame& frame) { // TODO: Need to find out if we are in the callback context, @@ -274,6 +319,12 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ + if (isClient && !initiated) { + //get & write protocol header from upper layers + write(inputHandler->getInitiation()); + initiated = true; + return; + } ScopedLock<Mutex> l(frameQueueLock); if (frameQueue.empty()) { diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h index 226096c5ef..1936b5ec50 100644 --- a/cpp/src/qpid/sys/ConnectionInputHandler.h +++ b/cpp/src/qpid/sys/ConnectionInputHandler.h @@ -36,6 +36,7 @@ namespace sys { public TimeoutHandler, public OutputTask { public: + virtual qpid::framing::ProtocolInitiation getInitiation() = 0; virtual void closed() = 0; }; |
