diff options
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 120 |
1 files changed, 92 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 400c2080b2..473ef7936f 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -27,6 +27,7 @@ #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <errno.h> #include <boost/bind.hpp> @@ -42,6 +43,14 @@ void nonblocking(int fd) { QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK)); } +/* + * Make *process* not generate SIGPIPE when writing to closed + * pipe/socket (necessary as default action is to terminate process) + */ +void ignoreSigpipe() { + ::signal(SIGPIPE, SIG_IGN); +} + } /* @@ -50,13 +59,14 @@ void nonblocking(int fd) { AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) : acceptedCallback(callback), - handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) { + handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { nonblocking(fd); + ignoreSigpipe(); } void AsynchAcceptor::start(Poller::shared_ptr poller) { - handle.watch(poller); + handle.startWatch(poller); } /* @@ -84,28 +94,50 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch reader/writer */ -AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) : +AsynchIO::AsynchIO(int fd, + ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, + BuffersEmptyCallback eCb, IdleCallback iCb) : + + DispatchHandle(fd, + boost::bind(&AsynchIO::readable, this, _1), + boost::bind(&AsynchIO::writeable, this, _1), + boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), + disCallback(disCb), emptyCallback(eCb), - idleCallback(iCb), - handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) { + idleCallback(iCb) { nonblocking(fd); } +struct deleter +{ + template <typename T> + void operator()(T *ptr){ delete ptr;} +}; + +AsynchIO::~AsynchIO() { + std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter()); + std::for_each( writeQueue.begin(), writeQueue.end(), deleter()); +} + +void AsynchIO::queueForDeletion() { + DispatchHandle::doDelete(); +} + void AsynchIO::start(Poller::shared_ptr poller) { - handle.watch(poller); + DispatchHandle::startWatch(poller); } -void AsynchIO::QueueReadBuffer(const Buffer& buff) { +void AsynchIO::queueReadBuffer(Buffer* buff) { bufferQueue.push_front(buff); - handle.rewatchRead(); + DispatchHandle::rewatchRead(); } -void AsynchIO::QueueWrite(const Buffer& buff) { +void AsynchIO::queueWrite(Buffer* buff) { writeQueue.push_front(buff); - handle.rewatchWrite(); + DispatchHandle::rewatchWrite(); } /* @@ -117,22 +149,34 @@ void AsynchIO::readable(DispatchHandle& h) { // (Try to) get a buffer if (!bufferQueue.empty()) { // Read into buffer - Buffer buff = bufferQueue.back(); + Buffer* buff = bufferQueue.back(); bufferQueue.pop_back(); errno = 0; - int rc = ::read(h.getFD(), buff.bytes, buff.byteCount); + int rc = ::read(h.getFD(), buff->bytes, buff->byteCount); if (rc == 0) { - eofCallback(); + eofCallback(*this); + h.unwatchRead(); + return; } else if (rc > 0) { - readCallback(buff, rc); + buff->dataStart = 0; + buff->dataCount = rc; + readCallback(*this, buff); + if (rc != buff->byteCount) { + // If we didn't fill the read buffer then time to stop reading + return; + } } else { // Put buffer back bufferQueue.push_back(buff); - if (errno == EAGAIN) { - // We must have just put a buffer back so we know - // we can do this - h.rewatchRead(); + // This is effectively the same as eof + if (errno == ECONNRESET) { + eofCallback(*this); + h.unwatchRead(); + return; + } else if (errno == EAGAIN) { + // We have just put a buffer back so we know + // we can carry on watching for reads return; } else { QPID_POSIX_CHECK(rc); @@ -141,10 +185,11 @@ void AsynchIO::readable(DispatchHandle& h) { } else { // Something to read but no buffer if (emptyCallback) { - emptyCallback(); + emptyCallback(*this); } // If we still have no buffers we can't do anything more if (bufferQueue.empty()) { + h.unwatchRead(); return; } @@ -160,36 +205,55 @@ void AsynchIO::writeable(DispatchHandle& h) { // See if we've got something to write if (!writeQueue.empty()) { // Write buffer - Buffer buff = writeQueue.back(); + Buffer* buff = writeQueue.back(); writeQueue.pop_back(); errno = 0; - int rc = ::write(h.getFD(), buff.bytes, buff.byteCount); + assert(buff->dataStart+buff->dataCount <= buff->byteCount); + int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { + // If we didn't write full buffer put rest back + if (rc != buff->dataCount) { + buff->dataStart += rc; + buff->dataCount -= rc; + writeQueue.push_back(buff); + return; + } + // Recycle the buffer - QueueReadBuffer(buff); + queueReadBuffer(buff); } else { // Put buffer back writeQueue.push_back(buff); - - if (errno == EAGAIN) { + if (errno == ECONNRESET || errno == EPIPE) { + // Just stop watching for write here - we'll get a + // disconnect callback soon enough + h.unwatchWrite(); + return; + } else if (errno == EAGAIN) { // We have just put a buffer back so we know - // we can do this - h.rewatchWrite(); + // we can carry on watching for writes return; } else { QPID_POSIX_CHECK(rc); } } } else { - // Something to read but no buffer + // Fd is writable, but nothing to write if (idleCallback) { - idleCallback(h.getFD()); + idleCallback(*this); } // If we still have no buffers to write we can't do anything more if (writeQueue.empty()) { + h.unwatchWrite(); return; } } } while (true); } +void AsynchIO::disconnected(DispatchHandle& h) { + if (disCallback) { + disCallback(*this); + h.unwatch(); + } +} |
