diff options
| author | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
| commit | 6005ad88685a4bad8bdaa986a8b94fdffc51b31e (patch) | |
| tree | 3f263060806fa3e4d92265c3e43e0eba3c829d7a /cpp/src/qpid/sys | |
| parent | 1b7c1ba40170027956d7df585eaae385d2511669 (diff) | |
| download | qpid-python-6005ad88685a4bad8bdaa986a8b94fdffc51b31e.tar.gz | |
Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim.
- Refactor Connector::writebuff, ::send as Connector::Writer
- Collect frames up to EOF notifying AIO write.
- Encode all available complete framesets into buffers as compactly as possible.
- Logging buffer size and frames encoded per write for client and broker.
- framing::Buffer added getPosition(), getSize(), default ctor, copy ctor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@610972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 246 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 59 |
2 files changed, 156 insertions, 149 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 9ca083354b..c2c4b545f9 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -44,12 +44,12 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; - Socket listener; - int numIOThreads; - const uint16_t listeningPort; + Poller::shared_ptr poller; + Socket listener; + int numIOThreads; + const uint16_t listeningPort; -public: + public: AsynchIOAcceptor(int16_t port, int backlog, int threads); ~AsynchIOAcceptor() {} void run(ConnectionInputHandlerFactory* factory); @@ -58,7 +58,7 @@ public: uint16_t getPort() const; std::string getHost() const; -private: + private: void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); }; @@ -69,9 +69,9 @@ Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) } AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) + poller(new Poller), + numIOThreads(threads), + listeningPort(listener.listen(port, backlog)) {} // Buffer definition @@ -93,53 +93,53 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { bool readError; std::string identifier; -public: - AsynchIOHandler() : - inputHandler(0), - frameQueueClosed(false), - initiated(false), - readError(false) - {} + public: + AsynchIOHandler() : + inputHandler(0), + frameQueueClosed(false), + initiated(false), + readError(false) + {} - ~AsynchIOHandler() { - if (inputHandler) - inputHandler->closed(); - delete inputHandler; - } - - void init(AsynchIO* a, ConnectionInputHandler* h) { - aio = a; - inputHandler = h; - } - - // Output side - void send(framing::AMQFrame&); - void close(); - void activateOutput(); - - // Input side - void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); - void eof(AsynchIO& aio); - void disconnect(AsynchIO& aio); + ~AsynchIOHandler() { + if (inputHandler) + inputHandler->closed(); + delete inputHandler; + } + + void init(AsynchIO* a, ConnectionInputHandler* h) { + aio = a; + inputHandler = h; + } + + // Output side + void send(framing::AMQFrame&); + void close(); + void activateOutput(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); - // Notifications - void nobuffs(AsynchIO& aio); - void idle(AsynchIO& aio); - void closedSocket(AsynchIO& aio, const Socket& s); + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); }; void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { - AsynchIOHandler* async = new AsynchIOHandler; - ConnectionInputHandler* handler = f->create(async, s); + AsynchIOHandler* async = new AsynchIOHandler; + ConnectionInputHandler* handler = f->create(async, s); AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, handler); + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); // Give connection some buffers to use for (int i = 0; i < 4; i++) { @@ -158,50 +158,50 @@ std::string AsynchIOAcceptor::getHost() const { } void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); + Dispatcher d(poller); + AsynchAcceptor + acceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); + acceptor.start(poller); - std::vector<Thread> t(numIOThreads-1); + std::vector<Thread> t(numIOThreads-1); - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); - // Run final thread - d.run(); + // Run final thread + d.run(); - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void AsynchIOAcceptor::shutdown() { - poller->shutdown(); + poller->shutdown(); } // Output side void AsynchIOHandler::send(framing::AMQFrame& frame) { - // TODO: Need to find out if we are in the callback context, - // in the callback thread if so we can go further than just queuing the frame - // to be handled later - { + // TODO: Need to find out if we are in the callback context, + // in the callback thread if so we can go further than just queuing the frame + // to be handled later + { ScopedLock<Mutex> l(frameQueueLock); // Ignore anything seen after closing if (!frameQueueClosed) - frameQueue.push(frame); - } + frameQueue.push(frame); + } - // Activate aio for writing here - aio->notifyPendingWrite(); + // Activate aio for writing here + aio->notifyPendingWrite(); } void AsynchIOHandler::close() { - ScopedLock<Mutex> l(frameQueueLock); - frameQueueClosed = true; + ScopedLock<Mutex> l(frameQueueLock); + frameQueueClosed = true; } void AsynchIOHandler::activateOutput() { @@ -218,7 +218,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::AMQFrame frame; try{ while(frame.decode(in)) { - QPID_LOG(debug, "RECV [" << identifier << "]: " << frame); + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); inputHandler->received(frame); } }catch(const std::exception& e){ @@ -249,9 +249,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { } void AsynchIOHandler::eof(AsynchIO&) { - QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); - inputHandler->closed(); - aio->queueWriteClose(); + QPID_LOG(debug, "DISCONNECTED [" << identifier << "]"); + inputHandler->closed(); + aio->queueWriteClose(); } void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { @@ -259,14 +259,14 @@ void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { if (!aio->writeQueueEmpty()) { QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); } - delete &s; - aio->queueForDeletion(); - delete this; + delete &s; + aio->queueForDeletion(); + delete this; } void AsynchIOHandler::disconnect(AsynchIO& a) { - // treat the same as eof - eof(a); + // treat the same as eof + eof(a); } // Notifications @@ -274,50 +274,54 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - ScopedLock<Mutex> l(frameQueueLock); + ScopedLock<Mutex> l(frameQueueLock); - if (frameQueue.empty()) { - // At this point we know that we're write idling the connection - // so tell the input handler to queue any available output: - inputHandler->doOutput(); - //if still no frames, theres nothing to do: - if (frameQueue.empty()) return; - } + if (frameQueue.empty()) { + // At this point we know that we're write idling the connection + // so tell the input handler to queue any available output: + inputHandler->doOutput(); + //if still no frames, theres nothing to do: + if (frameQueue.empty()) return; + } - do { - // Try and get a queued buffer if not then construct new one - AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - framing::Buffer out(buff->bytes, buff->byteCount); - int buffUsed = 0; + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::BufferBase* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; - framing::AMQFrame frame = frameQueue.front(); - int frameSize = frame.size(); - while (frameSize <= int(out.available())) { - frameQueue.pop(); + framing::AMQFrame frame = frameQueue.front(); + int frameSize = frame.size(); + int framesEncoded=0; + while (frameSize <= int(out.available())) { + frameQueue.pop(); - // Encode output frame - frame.encode(out); - buffUsed += frameSize; - QPID_LOG(debug, "SENT [" << identifier << "]: " << frame); + // Encode output frame + frame.encode(out); + ++framesEncoded; + buffUsed += frameSize; + QPID_LOG(trace, "SENT [" << identifier << "]: " << frame); - if (frameQueue.empty()) - break; - frame = frameQueue.front(); - frameSize = frame.size(); - } - // If frame was egregiously large complain - if (frameSize > buff->byteCount) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + if (frameQueue.empty()) + break; + frame = frameQueue.front(); + frameSize = frame.size(); + } + QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames "); + + // If frame was egregiously large complain + if (frameSize > buff->byteCount) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - buff->dataCount = buffUsed; - aio->queueWrite(buff); - } while (!frameQueue.empty()); + buff->dataCount = buffUsed; + aio->queueWrite(buff); + } while (!frameQueue.empty()); - if (frameQueueClosed) { - aio->queueWriteClose(); - } + if (frameQueueClosed) { + aio->queueWriteClose(); + } } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index f8aaa38cf5..94c68bd5d0 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -42,7 +42,7 @@ namespace { * pipe/socket (necessary as default action is to terminate process) */ void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); + ::signal(SIGPIPE, SIG_IGN); } /* @@ -88,7 +88,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { if (s) { acceptedCallback(*s); } else { - break; + break; } } while (true); @@ -99,13 +99,13 @@ void AsynchAcceptor::readable(DispatchHandle& h) { * Asynch reader/writer */ AsynchIO::AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : DispatchHandle(s, - boost::bind(&AsynchIO::readable, this, _1), - boost::bind(&AsynchIO::writeable, this, _1), - boost::bind(&AsynchIO::disconnected, this, _1)), + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), disCallback(disCb), @@ -120,8 +120,8 @@ AsynchIO::AsynchIO(const Socket& s, struct deleter { - template <typename T> - void operator()(T *ptr){ delete ptr;} + template <typename T> + void operator()(T *ptr){ delete ptr;} }; AsynchIO::~AsynchIO() { @@ -138,7 +138,7 @@ void AsynchIO::start(Poller::shared_ptr poller) { } void AsynchIO::queueReadBuffer(BufferBase* buff) { - assert(buff); + assert(buff); buff->dataStart = 0; buff->dataCount = 0; bufferQueue.push_back(buff); @@ -146,11 +146,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { } void AsynchIO::unread(BufferBase* buff) { - assert(buff); - if (buff->dataStart != 0) { - memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); - buff->dataStart = 0; - } + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } bufferQueue.push_front(buff); DispatchHandle::rewatchRead(); } @@ -182,14 +182,15 @@ void AsynchIO::queueWriteClose() { * to spare */ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; } /* @@ -204,6 +205,7 @@ void AsynchIO::readable(DispatchHandle& h) { if (!bufferQueue.empty()) { // Read into buffer BufferBase* buff = bufferQueue.front(); + assert(buff); bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; @@ -227,6 +229,7 @@ void AsynchIO::readable(DispatchHandle& h) { } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); + assert(buff); // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { @@ -352,10 +355,10 @@ void AsynchIO::disconnected(DispatchHandle& h) { * Close the socket and callback to say we've done it */ void AsynchIO::close(DispatchHandle& h) { - h.stopWatch(); - h.getSocket().close(); - if (closedCallback) { - closedCallback(*this, getSocket()); - } + h.stopWatch(); + h.getSocket().close(); + if (closedCallback) { + closedCallback(*this, getSocket()); + } } |
