summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp170
1 files changed, 121 insertions, 49 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 6e12a9c84f..b1ec580605 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -25,6 +25,12 @@
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+
+#include <boost/bind.hpp>
+
namespace qpid {
namespace client {
@@ -43,9 +49,9 @@ Connector::Connector(
idleIn(0), idleOut(0),
timeoutHandler(0),
shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size)
-{ }
+ aio(0)
+{
+}
Connector::~Connector(){
if (receiver.id()) {
@@ -56,19 +62,28 @@ Connector::~Connector(){
void Connector::connect(const std::string& host, int port){
socket.connect(host, port);
closed = false;
- receiver = Thread(this);
+ poller = Poller::shared_ptr(new Poller);
+ aio = new AsynchIO(socket,
+ boost::bind(&Connector::readbuff, this, _1, _2),
+ boost::bind(&Connector::eof, this, _1),
+ boost::bind(&Connector::eof, this, _1),
+ 0, // closed
+ 0, // nobuffs
+ boost::bind(&Connector::writebuff, this, _1));
}
void Connector::init(){
ProtocolInitiation init(version);
- writeBlock(&init);
+
+ writeDataBlock(init);
+ receiver = Thread(this);
}
// Call with closedLock held
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
if (!closed) {
- socket.close();
+ poller->shutdown();
closed = true;
return true;
}
@@ -92,28 +107,11 @@ OutputHandler* Connector::getOutputHandler(){
}
void Connector::send(AMQFrame& frame){
- writeBlock(&frame);
- QPID_LOG(trace, "SENT: " << frame);
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
Mutex::ScopedLock l(writeLock);
- data->encode(outbuf);
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- size_t written = 0;
- while(written < available && !closed){
- ssize_t sent = socket.send(data + written, available-written);
- if(sent > 0) {
- lastOut = now();
- written += sent;
- }
- }
+ writeFrameQueue.push(frame);
+ aio->queueWrite();
+
+ QPID_LOG(trace, "SENT: " << frame);
}
void Connector::handleClosed() {
@@ -121,6 +119,10 @@ void Connector::handleClosed() {
shutdownHandler->shutdown();
}
+// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
+// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
+//
+// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
AbsTime t = now();
@@ -166,33 +168,103 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-void Connector::run(){
- try{
- while(!closed){
- ssize_t available = inbuf.available();
- if(available < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- ssize_t received = socket.recv(inbuf.start(), available);
- checkIdle(received);
-
- if(!closed && received > 0){
- inbuf.move(received);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame;
- while(frame.decode(inbuf)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
+
+// Buffer definition
+struct Buff : public AsynchIO::BufferBase {
+ Buff() :
+ AsynchIO::BufferBase(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV: " << frame);
+ input->received(frame);
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+
+void Connector::writebuff(AsynchIO& aio) {
+ Mutex::ScopedLock l(writeLock);
+
+ if (writeFrameQueue.empty()) {
+ return;
+ }
+
+ do {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio.getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ int buffUsed = 0;
+
+ framing::AMQFrame frame = writeFrameQueue.front();
+ int frameSize = frame.size();
+ while (frameSize <= int(out.available())) {
+
+ // Encode output frame
+ frame.encode(out);
+ buffUsed += frameSize;
+
+ writeFrameQueue.pop();
+ if (writeFrameQueue.empty())
+ break;
+ frame = writeFrameQueue.front();
+ frameSize = frame.size();
}
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
+
+ buff->dataCount = buffUsed;
+ aio.queueWrite(buff);
+ } while (!writeFrameQueue.empty());
+}
+
+void Connector::writeDataBlock(const AMQDataBlock& data) {
+ AsynchIO::BufferBase* buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void Connector::eof(AsynchIO&) {
+ handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void Connector::run(){
+ try {
+ Dispatcher d(poller);
+
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff);
}
- }
- } catch (const std::exception& e) {
+
+ aio->start(poller);
+ d.run();
+ aio->queueForDeletion();
+ socket.close();
+ } catch (const std::exception& e) {
QPID_LOG(error, e.what());
handleClosed();
}
}
+
}} // namespace qpid::client