diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 92 |
1 files changed, 68 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b25f19e4ba..08dae4105d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -25,6 +25,12 @@ #include "qpid/framing/AMQFrame.h" #include "Connector.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" + +#include <boost/bind.hpp> + namespace qpid { namespace client { @@ -43,7 +49,6 @@ Connector::Connector( idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), - inbuf(receive_buffer_size), outbuf(send_buffer_size) { } @@ -56,6 +61,7 @@ Connector::~Connector(){ void Connector::connect(const std::string& host, int port){ socket.connect(host, port); closed = false; + poller = Poller::shared_ptr(new Poller); receiver = Thread(this); } @@ -68,7 +74,7 @@ void Connector::init(){ bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { - socket.close(); + poller->shutdown(); closed = true; return true; } @@ -91,6 +97,8 @@ OutputHandler* Connector::getOutputHandler(){ return this; } +// TODO: astitcher 20070908: Writing still needs to be transferred to the aynchronous IO +// framework. void Connector::send(AMQFrame& frame){ writeBlock(&frame); QPID_LOG(trace, "SENT: " << frame); @@ -121,6 +129,10 @@ void Connector::handleClosed() { shutdownHandler->shutdown(); } +// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing +// can never be called. The timeut processing needs to be added into the underlying Dispatcher code +// +// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ AbsTime t = now(); @@ -166,33 +178,65 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } + +// Buffer definition +struct Buff : public AsynchIO::Buffer { + Buff() : + AsynchIO::Buffer(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +void Connector::readbuff(AsynchIO& aio, AsynchIO::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame(version); + while(frame.decode(in)){ + QPID_LOG(trace, "RECV: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::eof(AsynchIO&) { + handleClosed(); +} + +// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing +// will never be called void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame(version); - while(frame.decode(inbuf)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); + try { + Dispatcher d(poller); + + AsynchIO* aio = new AsynchIO(socket, + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1)); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); } - } - } catch (const std::exception& e) { + + aio->start(poller); + d.run(); + aio->queueForDeletion(); + socket.close(); + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } } + }} // namespace qpid::client |
