summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-01-10 22:50:23 +0000
committerAlan Conway <aconway@apache.org>2008-01-10 22:50:23 +0000
commit6005ad88685a4bad8bdaa986a8b94fdffc51b31e (patch)
tree3f263060806fa3e4d92265c3e43e0eba3c829d7a /cpp/src/qpid/sys/AsynchIOAcceptor.cpp
parent1b7c1ba40170027956d7df585eaae385d2511669 (diff)
downloadqpid-python-6005ad88685a4bad8bdaa986a8b94fdffc51b31e.tar.gz
Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim. - Refactor Connector::writebuff, ::send as Connector::Writer - Collect frames up to EOF notifying AIO write. - Encode all available complete framesets into buffers as compactly as possible. - Logging buffer size and frames encoded per write for client and broker. - framing::Buffer added getPosition(), getSize(), default ctor, copy ctor. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@610972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp246
1 files changed, 125 insertions, 121 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 9ca083354b..c2c4b545f9 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -44,12 +44,12 @@ namespace qpid {
namespace sys {
class AsynchIOAcceptor : public Acceptor {
- Poller::shared_ptr poller;
- Socket listener;
- int numIOThreads;
- const uint16_t listeningPort;
+ Poller::shared_ptr poller;
+ Socket listener;
+ int numIOThreads;
+ const uint16_t listeningPort;
-public:
+ public:
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
void run(ConnectionInputHandlerFactory* factory);
@@ -58,7 +58,7 @@ public:
uint16_t getPort() const;
std::string getHost() const;
-private:
+ private:
void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
};
@@ -69,9 +69,9 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
}
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
- poller(new Poller),
- numIOThreads(threads),
- listeningPort(listener.listen(port, backlog))
+ poller(new Poller),
+ numIOThreads(threads),
+ listeningPort(listener.listen(port, backlog))
{}
// Buffer definition
@@ -93,53 +93,53 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
bool readError;
std::string identifier;
-public:
- AsynchIOHandler() :
- inputHandler(0),
- frameQueueClosed(false),
- initiated(false),
- readError(false)
- {}
+ public:
+ AsynchIOHandler() :
+ inputHandler(0),
+ frameQueueClosed(false),
+ initiated(false),
+ readError(false)
+ {}
- ~AsynchIOHandler() {
- if (inputHandler)
- inputHandler->closed();
- delete inputHandler;
- }
-
- void init(AsynchIO* a, ConnectionInputHandler* h) {
- aio = a;
- inputHandler = h;
- }
-
- // Output side
- void send(framing::AMQFrame&);
- void close();
- void activateOutput();
-
- // Input side
- void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
- void eof(AsynchIO& aio);
- void disconnect(AsynchIO& aio);
+ ~AsynchIOHandler() {
+ if (inputHandler)
+ inputHandler->closed();
+ delete inputHandler;
+ }
+
+ void init(AsynchIO* a, ConnectionInputHandler* h) {
+ aio = a;
+ inputHandler = h;
+ }
+
+ // Output side
+ void send(framing::AMQFrame&);
+ void close();
+ void activateOutput();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
- // Notifications
- void nobuffs(AsynchIO& aio);
- void idle(AsynchIO& aio);
- void closedSocket(AsynchIO& aio, const Socket& s);
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
};
void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
- AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s);
+ AsynchIOHandler* async = new AsynchIOHandler;
+ ConnectionInputHandler* handler = f->create(async, s);
AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
@@ -158,50 +158,50 @@ std::string AsynchIOAcceptor::getHost() const {
}
void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
- Dispatcher d(poller);
- AsynchAcceptor
- acceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
- acceptor.start(poller);
+ Dispatcher d(poller);
+ AsynchAcceptor
+ acceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+ acceptor.start(poller);
- std::vector<Thread> t(numIOThreads-1);
+ std::vector<Thread> t(numIOThreads-1);
- // Run n-1 io threads
- for (int i=0; i<numIOThreads-1; ++i)
- t[i] = Thread(d);
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = Thread(d);
- // Run final thread
- d.run();
+ // Run final thread
+ d.run();
- // Now wait for n-1 io threads to exit
- for (int i=0; i<numIOThreads-1; ++i) {
- t[i].join();
- }
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i<numIOThreads-1; ++i) {
+ t[i].join();
+ }
}
void AsynchIOAcceptor::shutdown() {
- poller->shutdown();
+ poller->shutdown();
}
// Output side
void AsynchIOHandler::send(framing::AMQFrame& frame) {
- // TODO: Need to find out if we are in the callback context,
- // in the callback thread if so we can go further than just queuing the frame
- // to be handled later
- {
+ // TODO: Need to find out if we are in the callback context,
+ // in the callback thread if so we can go further than just queuing the frame
+ // to be handled later
+ {
ScopedLock<Mutex> l(frameQueueLock);
// Ignore anything seen after closing
if (!frameQueueClosed)
- frameQueue.push(frame);
- }
+ frameQueue.push(frame);
+ }
- // Activate aio for writing here
- aio->notifyPendingWrite();
+ // Activate aio for writing here
+ aio->notifyPendingWrite();
}
void AsynchIOHandler::close() {
- ScopedLock<Mutex> l(frameQueueLock);
- frameQueueClosed = true;
+ ScopedLock<Mutex> l(frameQueueLock);
+ frameQueueClosed = true;
}
void AsynchIOHandler::activateOutput() {
@@ -218,7 +218,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
framing::AMQFrame frame;
try{
while(frame.decode(in)) {
- QPID_LOG(debug, "RECV [" << identifier << "]: " << frame);
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
inputHandler->received(frame);
}
}catch(const std::exception& e){
@@ -249,9 +249,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
}
void AsynchIOHandler::eof(AsynchIO&) {
- QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- inputHandler->closed();
- aio->queueWriteClose();
+ QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
+ inputHandler->closed();
+ aio->queueWriteClose();
}
void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
@@ -259,14 +259,14 @@ void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
if (!aio->writeQueueEmpty()) {
QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
}
- delete &s;
- aio->queueForDeletion();
- delete this;
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
}
void AsynchIOHandler::disconnect(AsynchIO& a) {
- // treat the same as eof
- eof(a);
+ // treat the same as eof
+ eof(a);
}
// Notifications
@@ -274,50 +274,54 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- ScopedLock<Mutex> l(frameQueueLock);
+ ScopedLock<Mutex> l(frameQueueLock);
- if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so tell the input handler to queue any available output:
- inputHandler->doOutput();
- //if still no frames, theres nothing to do:
- if (frameQueue.empty()) return;
- }
+ if (frameQueue.empty()) {
+ // At this point we know that we're write idling the connection
+ // so tell the input handler to queue any available output:
+ inputHandler->doOutput();
+ //if still no frames, theres nothing to do:
+ if (frameQueue.empty()) return;
+ }
- do {
- // Try and get a queued buffer if not then construct new one
- AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
+ do {
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ int buffUsed = 0;
- framing::AMQFrame frame = frameQueue.front();
- int frameSize = frame.size();
- while (frameSize <= int(out.available())) {
- frameQueue.pop();
+ framing::AMQFrame frame = frameQueue.front();
+ int frameSize = frame.size();
+ int framesEncoded=0;
+ while (frameSize <= int(out.available())) {
+ frameQueue.pop();
- // Encode output frame
- frame.encode(out);
- buffUsed += frameSize;
- QPID_LOG(debug, "SENT [" << identifier << "]: " << frame);
+ // Encode output frame
+ frame.encode(out);
+ ++framesEncoded;
+ buffUsed += frameSize;
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
- if (frameQueue.empty())
- break;
- frame = frameQueue.front();
- frameSize = frame.size();
- }
- // If frame was egregiously large complain
- if (frameSize > buff->byteCount)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
+ if (frameQueue.empty())
+ break;
+ frame = frameQueue.front();
+ frameSize = frame.size();
+ }
+ QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames ");
+
+ // If frame was egregiously large complain
+ if (frameSize > buff->byteCount)
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
- buff->dataCount = buffUsed;
- aio->queueWrite(buff);
- } while (!frameQueue.empty());
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+ } while (!frameQueue.empty());
- if (frameQueueClosed) {
- aio->queueWriteClose();
- }
+ if (frameQueueClosed) {
+ aio->queueWriteClose();
+ }
}
}} // namespace qpid::sys