summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp120
1 files changed, 92 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 400c2080b2..473ef7936f 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -27,6 +27,7 @@
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
+#include <signal.h>
#include <errno.h>
#include <boost/bind.hpp>
@@ -42,6 +43,14 @@ void nonblocking(int fd) {
QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
}
+/*
+ * Make *process* not generate SIGPIPE when writing to closed
+ * pipe/socket (necessary as default action is to terminate process)
+ */
+void ignoreSigpipe() {
+ ::signal(SIGPIPE, SIG_IGN);
+}
+
}
/*
@@ -50,13 +59,14 @@ void nonblocking(int fd) {
AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
acceptedCallback(callback),
- handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+ handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
nonblocking(fd);
+ ignoreSigpipe();
}
void AsynchAcceptor::start(Poller::shared_ptr poller) {
- handle.watch(poller);
+ handle.startWatch(poller);
}
/*
@@ -84,28 +94,50 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
/*
* Asynch reader/writer
*/
-AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+AsynchIO::AsynchIO(int fd,
+ ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
+ BuffersEmptyCallback eCb, IdleCallback iCb) :
+
+ DispatchHandle(fd,
+ boost::bind(&AsynchIO::readable, this, _1),
+ boost::bind(&AsynchIO::writeable, this, _1),
+ boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
+ disCallback(disCb),
emptyCallback(eCb),
- idleCallback(iCb),
- handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) {
+ idleCallback(iCb) {
nonblocking(fd);
}
+struct deleter
+{
+ template <typename T>
+ void operator()(T *ptr){ delete ptr;}
+};
+
+AsynchIO::~AsynchIO() {
+ std::for_each( bufferQueue.begin(), bufferQueue.end(), deleter());
+ std::for_each( writeQueue.begin(), writeQueue.end(), deleter());
+}
+
+void AsynchIO::queueForDeletion() {
+ DispatchHandle::doDelete();
+}
+
void AsynchIO::start(Poller::shared_ptr poller) {
- handle.watch(poller);
+ DispatchHandle::startWatch(poller);
}
-void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+void AsynchIO::queueReadBuffer(Buffer* buff) {
bufferQueue.push_front(buff);
- handle.rewatchRead();
+ DispatchHandle::rewatchRead();
}
-void AsynchIO::QueueWrite(const Buffer& buff) {
+void AsynchIO::queueWrite(Buffer* buff) {
writeQueue.push_front(buff);
- handle.rewatchWrite();
+ DispatchHandle::rewatchWrite();
}
/*
@@ -117,22 +149,34 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer buff = bufferQueue.back();
+ Buffer* buff = bufferQueue.back();
bufferQueue.pop_back();
errno = 0;
- int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+ int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
if (rc == 0) {
- eofCallback();
+ eofCallback(*this);
+ h.unwatchRead();
+ return;
} else if (rc > 0) {
- readCallback(buff, rc);
+ buff->dataStart = 0;
+ buff->dataCount = rc;
+ readCallback(*this, buff);
+ if (rc != buff->byteCount) {
+ // If we didn't fill the read buffer then time to stop reading
+ return;
+ }
} else {
// Put buffer back
bufferQueue.push_back(buff);
- if (errno == EAGAIN) {
- // We must have just put a buffer back so we know
- // we can do this
- h.rewatchRead();
+ // This is effectively the same as eof
+ if (errno == ECONNRESET) {
+ eofCallback(*this);
+ h.unwatchRead();
+ return;
+ } else if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can carry on watching for reads
return;
} else {
QPID_POSIX_CHECK(rc);
@@ -141,10 +185,11 @@ void AsynchIO::readable(DispatchHandle& h) {
} else {
// Something to read but no buffer
if (emptyCallback) {
- emptyCallback();
+ emptyCallback(*this);
}
// If we still have no buffers we can't do anything more
if (bufferQueue.empty()) {
+ h.unwatchRead();
return;
}
@@ -160,36 +205,55 @@ void AsynchIO::writeable(DispatchHandle& h) {
// See if we've got something to write
if (!writeQueue.empty()) {
// Write buffer
- Buffer buff = writeQueue.back();
+ Buffer* buff = writeQueue.back();
writeQueue.pop_back();
errno = 0;
- int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+ assert(buff->dataStart+buff->dataCount <= buff->byteCount);
+ int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
+ // If we didn't write full buffer put rest back
+ if (rc != buff->dataCount) {
+ buff->dataStart += rc;
+ buff->dataCount -= rc;
+ writeQueue.push_back(buff);
+ return;
+ }
+
// Recycle the buffer
- QueueReadBuffer(buff);
+ queueReadBuffer(buff);
} else {
// Put buffer back
writeQueue.push_back(buff);
-
- if (errno == EAGAIN) {
+ if (errno == ECONNRESET || errno == EPIPE) {
+ // Just stop watching for write here - we'll get a
+ // disconnect callback soon enough
+ h.unwatchWrite();
+ return;
+ } else if (errno == EAGAIN) {
// We have just put a buffer back so we know
- // we can do this
- h.rewatchWrite();
+ // we can carry on watching for writes
return;
} else {
QPID_POSIX_CHECK(rc);
}
}
} else {
- // Something to read but no buffer
+ // Fd is writable, but nothing to write
if (idleCallback) {
- idleCallback(h.getFD());
+ idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
if (writeQueue.empty()) {
+ h.unwatchWrite();
return;
}
}
} while (true);
}
+void AsynchIO::disconnected(DispatchHandle& h) {
+ if (disCallback) {
+ disCallback(*this);
+ h.unwatch();
+ }
+}