diff options
| author | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-01-10 22:50:23 +0000 |
| commit | 6005ad88685a4bad8bdaa986a8b94fdffc51b31e (patch) | |
| tree | 3f263060806fa3e4d92265c3e43e0eba3c829d7a /cpp/src/qpid/sys/posix/AsynchIO.cpp | |
| parent | 1b7c1ba40170027956d7df585eaae385d2511669 (diff) | |
| download | qpid-python-6005ad88685a4bad8bdaa986a8b94fdffc51b31e.tar.gz | |
Client always collects at least an entire frameset into a single buffer
when possible. Based on patch from Gordon Sim.
- Refactor Connector::writebuff, ::send as Connector::Writer
- Collect frames up to EOF notifying AIO write.
- Encode all available complete framesets into buffers as compactly as possible.
- Logging buffer size and frames encoded per write for client and broker.
- framing::Buffer added getPosition(), getSize(), default ctor, copy ctor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@610972 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 59 |
1 files changed, 31 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index f8aaa38cf5..94c68bd5d0 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -42,7 +42,7 @@ namespace { * pipe/socket (necessary as default action is to terminate process) */ void ignoreSigpipe() { - ::signal(SIGPIPE, SIG_IGN); + ::signal(SIGPIPE, SIG_IGN); } /* @@ -88,7 +88,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { if (s) { acceptedCallback(*s); } else { - break; + break; } } while (true); @@ -99,13 +99,13 @@ void AsynchAcceptor::readable(DispatchHandle& h) { * Asynch reader/writer */ AsynchIO::AsynchIO(const Socket& s, - ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : DispatchHandle(s, - boost::bind(&AsynchIO::readable, this, _1), - boost::bind(&AsynchIO::writeable, this, _1), - boost::bind(&AsynchIO::disconnected, this, _1)), + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), disCallback(disCb), @@ -120,8 +120,8 @@ AsynchIO::AsynchIO(const Socket& s, struct deleter { - template <typename T> - void operator()(T *ptr){ delete ptr;} + template <typename T> + void operator()(T *ptr){ delete ptr;} }; AsynchIO::~AsynchIO() { @@ -138,7 +138,7 @@ void AsynchIO::start(Poller::shared_ptr poller) { } void AsynchIO::queueReadBuffer(BufferBase* buff) { - assert(buff); + assert(buff); buff->dataStart = 0; buff->dataCount = 0; bufferQueue.push_back(buff); @@ -146,11 +146,11 @@ void AsynchIO::queueReadBuffer(BufferBase* buff) { } void AsynchIO::unread(BufferBase* buff) { - assert(buff); - if (buff->dataStart != 0) { - memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); - buff->dataStart = 0; - } + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } bufferQueue.push_front(buff); DispatchHandle::rewatchRead(); } @@ -182,14 +182,15 @@ void AsynchIO::queueWriteClose() { * to spare */ AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() { - // Always keep at least one buffer (it might have data that was "unread" in it) - if (bufferQueue.size()<=1) - return 0; - BufferBase* buff = bufferQueue.back(); - buff->dataStart = 0; - buff->dataCount = 0; - bufferQueue.pop_back(); - return buff; + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + BufferBase* buff = bufferQueue.back(); + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; } /* @@ -204,6 +205,7 @@ void AsynchIO::readable(DispatchHandle& h) { if (!bufferQueue.empty()) { // Read into buffer BufferBase* buff = bufferQueue.front(); + assert(buff); bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; @@ -227,6 +229,7 @@ void AsynchIO::readable(DispatchHandle& h) { } else { // Put buffer back (at front so it doesn't interfere with unread buffers) bufferQueue.push_front(buff); + assert(buff); // Eof or other side has gone away if (rc == 0 || errno == ECONNRESET) { @@ -352,10 +355,10 @@ void AsynchIO::disconnected(DispatchHandle& h) { * Close the socket and callback to say we've done it */ void AsynchIO::close(DispatchHandle& h) { - h.stopWatch(); - h.getSocket().close(); - if (closedCallback) { - closedCallback(*this, getSocket()); - } + h.stopWatch(); + h.getSocket().close(); + if (closedCallback) { + closedCallback(*this, getSocket()); + } } |
