From 655b3b5806bafdd784f6a9c242e26341bd6aeccc Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 31 Aug 2007 18:20:29 +0000 Subject: * Changes to make C++ client code use the asynchronous network IO * Fixed up the test for buffer changes * Removed unused buffer operations git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571529 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/Connector.cpp | 170 +++++++++++++++++++++++++++----------- cpp/src/qpid/client/Connector.h | 22 +++-- 2 files changed, 136 insertions(+), 56 deletions(-) (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 6e12a9c84f..b1ec580605 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -25,6 +25,12 @@ #include "qpid/framing/AMQFrame.h" #include "Connector.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" + +#include + namespace qpid { namespace client { @@ -43,9 +49,9 @@ Connector::Connector( idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size) -{ } + aio(0) +{ +} Connector::~Connector(){ if (receiver.id()) { @@ -56,19 +62,28 @@ Connector::~Connector(){ void Connector::connect(const std::string& host, int port){ socket.connect(host, port); closed = false; - receiver = Thread(this); + poller = Poller::shared_ptr(new Poller); + aio = new AsynchIO(socket, + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1), + 0, // closed + 0, // nobuffs + boost::bind(&Connector::writebuff, this, _1)); } void Connector::init(){ ProtocolInitiation init(version); - writeBlock(&init); + + writeDataBlock(init); + receiver = Thread(this); } // Call with closedLock held bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { - socket.close(); + poller->shutdown(); closed = true; return true; } @@ -92,28 +107,11 @@ OutputHandler* Connector::getOutputHandler(){ } void Connector::send(AMQFrame& frame){ - writeBlock(&frame); - QPID_LOG(trace, "SENT: " << frame); -} - -void Connector::writeBlock(AMQDataBlock* data){ Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now(); - written += sent; - } - } + writeFrameQueue.push(frame); + aio->queueWrite(); + + QPID_LOG(trace, "SENT: " << frame); } void Connector::handleClosed() { @@ -121,6 +119,10 @@ void Connector::handleClosed() { shutdownHandler->shutdown(); } +// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing +// can never be called. The timeut processing needs to be added into the underlying Dispatcher code +// +// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ AbsTime t = now(); @@ -166,33 +168,103 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + +// Buffer definition +struct Buff : public AsynchIO::BufferBase { + Buff() : + AsynchIO::BufferBase(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::writebuff(AsynchIO& aio) { + Mutex::ScopedLock l(writeLock); + + if (writeFrameQueue.empty()) { + return; + } + + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio.getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; + + framing::AMQFrame frame = writeFrameQueue.front(); + int frameSize = frame.size(); + while (frameSize <= int(out.available())) { + + // Encode output frame + frame.encode(out); + buffUsed += frameSize; + + writeFrameQueue.pop(); + if (writeFrameQueue.empty()) + break; + frame = writeFrameQueue.front(); + frameSize = frame.size(); } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); + + buff->dataCount = buffUsed; + aio.queueWrite(buff); + } while (!writeFrameQueue.empty()); +} + +void Connector::writeDataBlock(const AMQDataBlock& data) { + AsynchIO::BufferBase* buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + +void Connector::eof(AsynchIO&) { + handleClosed(); +} + +// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing +// will never be called +void Connector::run(){ + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); } - } - } catch (const std::exception& e) { + + aio->start(poller); + d.run(); + aio->queueForDeletion(); + socket.close(); + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } } + }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 1577564d57..8aaaea247a 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -34,9 +34,12 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" +#include "qpid/sys/AsynchIO.h" -namespace qpid { +#include +namespace qpid { + namespace client { class Connector : public framing::OutputHandler, @@ -61,24 +64,29 @@ class Connector : public framing::OutputHandler, framing::InputHandler* input; framing::InitiationHandler* initialiser; framing::OutputHandler* output; - - framing::Buffer inbuf; - framing::Buffer outbuf; sys::Mutex writeLock; + std::queue writeFrameQueue; + sys::Thread receiver; sys::Socket socket; + sys::AsynchIO* aio; + sys::Poller::shared_ptr poller; + void checkIdle(ssize_t status); - void writeBlock(framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); void setSocketTimeout(); void run(); void handleClosed(); bool closeInternal(); - + + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*); + void writebuff(qpid::sys::AsynchIO&); + void writeDataBlock(const framing::AMQDataBlock& data); + void eof(qpid::sys::AsynchIO&); + friend class Channel; public: Connector(framing::ProtocolVersion pVersion, -- cgit v1.2.1