diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 170 |
1 files changed, 121 insertions, 49 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 6e12a9c84f..b1ec580605 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -25,6 +25,12 @@ #include "qpid/framing/AMQFrame.h" #include "Connector.h" +#include "qpid/sys/AsynchIO.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Poller.h" + +#include <boost/bind.hpp> + namespace qpid { namespace client { @@ -43,9 +49,9 @@ Connector::Connector( idleIn(0), idleOut(0), timeoutHandler(0), shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size) -{ } + aio(0) +{ +} Connector::~Connector(){ if (receiver.id()) { @@ -56,19 +62,28 @@ Connector::~Connector(){ void Connector::connect(const std::string& host, int port){ socket.connect(host, port); closed = false; - receiver = Thread(this); + poller = Poller::shared_ptr(new Poller); + aio = new AsynchIO(socket, + boost::bind(&Connector::readbuff, this, _1, _2), + boost::bind(&Connector::eof, this, _1), + boost::bind(&Connector::eof, this, _1), + 0, // closed + 0, // nobuffs + boost::bind(&Connector::writebuff, this, _1)); } void Connector::init(){ ProtocolInitiation init(version); - writeBlock(&init); + + writeDataBlock(init); + receiver = Thread(this); } // Call with closedLock held bool Connector::closeInternal() { Mutex::ScopedLock l(closedLock); if (!closed) { - socket.close(); + poller->shutdown(); closed = true; return true; } @@ -92,28 +107,11 @@ OutputHandler* Connector::getOutputHandler(){ } void Connector::send(AMQFrame& frame){ - writeBlock(&frame); - QPID_LOG(trace, "SENT: " << frame); -} - -void Connector::writeBlock(AMQDataBlock* data){ Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now(); - written += sent; - } - } + writeFrameQueue.push(frame); + aio->queueWrite(); + + QPID_LOG(trace, "SENT: " << frame); } void Connector::handleClosed() { @@ -121,6 +119,10 @@ void Connector::handleClosed() { shutdownHandler->shutdown(); } +// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing +// can never be called. The timeut processing needs to be added into the underlying Dispatcher code +// +// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ AbsTime t = now(); @@ -166,33 +168,103 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + +// Buffer definition +struct Buff : public AsynchIO::BufferBase { + Buff() : + AsynchIO::BufferBase(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + + AMQFrame frame; + while(frame.decode(in)){ + QPID_LOG(trace, "RECV: " << frame); + input->received(frame); + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio.unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio.queueReadBuffer(buff); + } +} + +void Connector::writebuff(AsynchIO& aio) { + Mutex::ScopedLock l(writeLock); + + if (writeFrameQueue.empty()) { + return; + } + + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio.getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; + + framing::AMQFrame frame = writeFrameQueue.front(); + int frameSize = frame.size(); + while (frameSize <= int(out.available())) { + + // Encode output frame + frame.encode(out); + buffUsed += frameSize; + + writeFrameQueue.pop(); + if (writeFrameQueue.empty()) + break; + frame = writeFrameQueue.front(); + frameSize = frame.size(); } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); + + buff->dataCount = buffUsed; + aio.queueWrite(buff); + } while (!writeFrameQueue.empty()); +} + +void Connector::writeDataBlock(const AMQDataBlock& data) { + AsynchIO::BufferBase* buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + data.encode(out); + buff->dataCount = data.size(); + aio->queueWrite(buff); +} + +void Connector::eof(AsynchIO&) { + handleClosed(); +} + +// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing +// will never be called +void Connector::run(){ + try { + Dispatcher d(poller); + + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff); } - } - } catch (const std::exception& e) { + + aio->start(poller); + d.run(); + aio->queueForDeletion(); + socket.close(); + } catch (const std::exception& e) { QPID_LOG(error, e.what()); handleClosed(); } } + }} // namespace qpid::client |