diff options
| author | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
| commit | 6005ad88685a4bad8bdaa986a8b94fdffc51b31e (patch) | |
| tree | 3f263060806fa3e4d92265c3e43e0eba3c829d7a /cpp/src/qpid/sys/AsynchIOAcceptor.cpp | |
| parent | 1b7c1ba40170027956d7df585eaae385d2511669 (diff) | |
| download | qpid-python-6005ad88685a4bad8bdaa986a8b94fdffc51b31e.tar.gz | |
Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim.
- Refactor Connector::writebuff, ::send as Connector::Writer
- Collect frames up to EOF notifying AIO write.
- Encode all available complete framesets into buffers as compactly as possible.
- Logging buffer size and frames encoded per write for client and broker.
- framing::Buffer added getPosition(), getSize(), default ctor, copy ctor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@610972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 246 |
1 files changed, 125 insertions, 121 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 9ca083354b..c2c4b545f9 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -44,12 +44,12 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; - Socket listener; - int numIOThreads; - const uint16_t listeningPort; + Poller::shared_ptr poller; + Socket listener; + int numIOThreads; + const uint16_t listeningPort; -public: + public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); @@ -58,7 +58,7 @@ public: uint16_t getPort() const; std::string getHost() const; -private: + private: void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); }; @@ -69,9 +69,9 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) + poller(new Poller), + numIOThreads(threads), + listeningPort(listener.listen(port, backlog)) {} // Buffer definition @@ -93,53 +93,53 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { bool readError; std::string identifier; -public: - AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - initiated(false), - readError(false) - {} + public: + AsynchIOHandler() : + inputHandler(0), + frameQueueClosed(false), + initiated(false), + readError(false) + {} - ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; - } - - void init(AsynchIO* a, ConnectionInputHandler* h) { - aio = a; - inputHandler = h; - } - - // Output side - void send(framing::AMQFrame&); - void close(); - void activateOutput(); - - // Input side - void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); - void eof(AsynchIO& aio); - void disconnect(AsynchIO& aio); + ~AsynchIOHandler() { + if (inputHandler) + inputHandler->closed(); + delete inputHandler; + } + + void init(AsynchIO* a, ConnectionInputHandler* h) { + aio = a; + inputHandler = h; + } + + // Output side + void send(framing::AMQFrame&); + void close(); + void activateOutput(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); - // Notifications - void nobuffs(AsynchIO& aio); - void idle(AsynchIO& aio); - void closedSocket(AsynchIO& aio, const Socket& s); + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); }; void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s); + AsynchIOHandler* async = new AsynchIOHandler; + ConnectionInputHandler* handler = f->create(async, s); AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); // Give connection some buffers to use for (int i = 0; i < 4; i++) { @@ -158,50 +158,50 @@ std::string AsynchIOAcceptor::getHost() const { } void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); + Dispatcher d(poller); + AsynchAcceptor + acceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); + acceptor.start(poller); - std::vector<Thread> t(numIOThreads-1); + std::vector<Thread> t(numIOThreads-1); - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); - // Run final thread - d.run(); + // Run final thread + d.run(); - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void AsynchIOAcceptor::shutdown() { - poller->shutdown(); + poller->shutdown(); } // Output side void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { + // TODO: Need to find out if we are in the callback context, + // in the callback thread if so we can go further than just queuing the frame + // to be handled later + { ScopedLock<Mutex> l(frameQueueLock); // Ignore anything seen after closing if (!frameQueueClosed) - frameQueue.push(frame); - } + frameQueue.push(frame); + } - // Activate aio for writing here - aio->notifyPendingWrite(); + // Activate aio for writing here + aio->notifyPendingWrite(); } void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; + ScopedLock<Mutex> l(frameQueueLock); + frameQueueClosed = true; } void AsynchIOHandler::activateOutput() { @@ -218,7 +218,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::AMQFrame frame; try{ while(frame.decode(in)) { - QPID_LOG(debug, "RECV [" << identifier << "]: " << frame); + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); inputHandler->received(frame); } }catch(const std::exception& e){ @@ -249,9 +249,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { } void AsynchIOHandler::eof(AsynchIO&) { - QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); - aio->queueWriteClose(); + QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); + inputHandler->closed(); + aio->queueWriteClose(); } void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { @@ -259,14 +259,14 @@ void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { if (!aio->writeQueueEmpty()) { QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); } - delete &s; - aio->queueForDeletion(); - delete this; + delete &s; + aio->queueForDeletion(); + delete this; } void AsynchIOHandler::disconnect(AsynchIO& a) { - // treat the same as eof - eof(a); + // treat the same as eof + eof(a); } // Notifications @@ -274,50 +274,54 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - ScopedLock<Mutex> l(frameQueueLock); + ScopedLock<Mutex> l(frameQueueLock); - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } + if (frameQueue.empty()) { + // At this point we know that we're write idling the connection + // so tell the input handler to queue any available output: + inputHandler->doOutput(); + //if still no frames, theres nothing to do: + if (frameQueue.empty()) return; + } - do { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - while (frameSize <= int(out.available())) { - frameQueue.pop(); + framing::AMQFrame frame = frameQueue.front(); + int frameSize = frame.size(); + int framesEncoded=0; + while (frameSize <= int(out.available())) { + frameQueue.pop(); - // Encode output frame - frame.encode(out); - buffUsed += frameSize; - QPID_LOG(debug, "SENT [" << identifier << "]: " << frame); + // Encode output frame + frame.encode(out); + ++framesEncoded; + buffUsed += frameSize; + QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - if (frameQueue.empty()) - break; - frame = frameQueue.front(); - frameSize = frame.size(); - } - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + if (frameQueue.empty()) + break; + frame = frameQueue.front(); + frameSize = frame.size(); + } + QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); + + // If frame was egregiously large complain + if (frameSize > buff->byteCount) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - buff->dataCount = buffUsed; - aio->queueWrite(buff); - } while (!frameQueue.empty()); + buff->dataCount = buffUsed; + aio->queueWrite(buff); + } while (!frameQueue.empty()); - if (frameQueueClosed) { - aio->queueWriteClose(); - } + if (frameQueueClosed) { + aio->queueWriteClose(); + } } }} // namespace qpid::sys |
