summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-02-01 18:21:01 +0000
committerGordon Sim <gsim@apache.org>2008-02-01 18:21:01 +0000
commit5891c19a838bd8987fbc04d23923f4f5f2ca4636 (patch)
tree1b8b75e076ebded9b57c84b547b8cf9b80a71427 /cpp/src/qpid/sys
parent4db96f7ad47c69982cdc6cf7b5e5c47b00f1144b (diff)
downloadqpid-python-5891c19a838bd8987fbc04d23923f4f5f2ca4636.tar.gz
Initial cut of inter-broker bridging
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@617590 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/Acceptor.h1
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp53
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandler.h1
3 files changed, 54 insertions, 1 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 2eee8b4abd..1e87b76e04 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -38,6 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+ virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
virtual void shutdown() = 0;
};
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 485f8c20f4..650bb31a68 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -30,6 +30,7 @@
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@ class AsynchIOAcceptor : public Acceptor {
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
+ void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
void shutdown();
uint16_t getPort() const;
@@ -92,13 +94,17 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
bool initiated;
bool readError;
std::string identifier;
+ bool isClient;
+
+ void write(const framing::AMQDataBlock&);
public:
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
initiated(false),
- readError(false)
+ readError(false),
+ isClient(false)
{}
~AsynchIOHandler() {
@@ -107,6 +113,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
delete inputHandler;
}
+ void setClient() { isClient = true; }
+
void init(AsynchIO* a, ConnectionInputHandler* h) {
aio = a;
inputHandler = h;
@@ -179,11 +187,48 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
t[i].join();
}
}
+
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ async->setClient();
+ ConnectionInputHandler* handler = f->create(async, *socket);
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+
+}
+
void AsynchIOAcceptor::shutdown() {
poller->shutdown();
}
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
// TODO: Need to find out if we are in the callback context,
@@ -274,6 +319,12 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
+ if (isClient && !initiated) {
+ //get & write protocol header from upper layers
+ write(inputHandler->getInitiation());
+ initiated = true;
+ return;
+ }
ScopedLock<Mutex> l(frameQueueLock);
if (frameQueue.empty()) {
diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h
index 226096c5ef..1936b5ec50 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -36,6 +36,7 @@ namespace sys {
public TimeoutHandler, public OutputTask
{
public:
+ virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};