summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp122
1 files changed, 89 insertions, 33 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 473ef7936f..2b462cbd7a 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -24,8 +24,6 @@
#include "check.h"
#include <unistd.h>
-#include <fcntl.h>
-#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>
#include <errno.h>
@@ -37,13 +35,6 @@ using namespace qpid::sys;
namespace {
/*
- * Make file descriptor non-blocking
- */
-void nonblocking(int fd) {
- QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
-}
-
-/*
* Make *process* not generate SIGPIPE when writing to closed
* pipe/socket (necessary as default action is to terminate process)
*/
@@ -57,11 +48,11 @@ void ignoreSigpipe() {
* Asynch Acceptor
*/
-AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
- nonblocking(fd);
+ s.setNonblocking();
ignoreSigpipe();
}
@@ -73,18 +64,16 @@ void AsynchAcceptor::start(Poller::shared_ptr poller) {
* We keep on accepting as long as there is something to accept
*/
void AsynchAcceptor::readable(DispatchHandle& h) {
- int afd;
+ Socket* s;
do {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- afd = ::accept(h.getFD(), 0, 0);
- if (afd >= 0) {
- acceptedCallback(afd);
- } else if (errno == EAGAIN) {
- break;
+ s = h.getSocket().accept(0, 0);
+ if (s) {
+ acceptedCallback(*s);
} else {
- QPID_POSIX_CHECK(afd);
+ break;
}
} while (true);
@@ -94,21 +83,23 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
/*
* Asynch reader/writer
*/
-AsynchIO::AsynchIO(int fd,
+AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
- BuffersEmptyCallback eCb, IdleCallback iCb) :
+ ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
- DispatchHandle(fd,
+ DispatchHandle(s,
boost::bind(&AsynchIO::readable, this, _1),
boost::bind(&AsynchIO::writeable, this, _1),
boost::bind(&AsynchIO::disconnected, this, _1)),
readCallback(rCb),
eofCallback(eofCb),
disCallback(disCb),
+ closedCallback(cCb),
emptyCallback(eCb),
- idleCallback(iCb) {
+ idleCallback(iCb),
+ queuedClose(false) {
- nonblocking(fd);
+ s.setNonblocking();
}
struct deleter
@@ -131,15 +122,58 @@ void AsynchIO::start(Poller::shared_ptr poller) {
}
void AsynchIO::queueReadBuffer(Buffer* buff) {
+ assert(buff);
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.push_back(buff);
+ DispatchHandle::rewatchRead();
+}
+
+void AsynchIO::unread(Buffer* buff) {
+ assert(buff);
+ if (buff->dataStart != 0) {
+ memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount);
+ buff->dataStart = 0;
+ }
bufferQueue.push_front(buff);
DispatchHandle::rewatchRead();
}
+// Either queue for writing or announce that there is something to write
+// and we should ask for it
void AsynchIO::queueWrite(Buffer* buff) {
- writeQueue.push_front(buff);
+ // If no buffer then don't queue anything
+ // (but still wake up for writing)
+ if (buff) {
+ // If we've already closed the socket then throw the write away
+ if (queuedClose) {
+ bufferQueue.push_front(buff);
+ return;
+ } else {
+ writeQueue.push_front(buff);
+ }
+ }
DispatchHandle::rewatchWrite();
}
+void AsynchIO::queueWriteClose() {
+ queuedClose = true;
+}
+
+/** Return a queued buffer if there are enough
+ * to spare
+ */
+AsynchIO::Buffer* AsynchIO::getQueuedBuffer() {
+ // Always keep at least one buffer (it might have data that was "unread" in it)
+ if (bufferQueue.size()<=1)
+ return 0;
+ Buffer* buff = bufferQueue.back();
+ buff->dataStart = 0;
+ buff->dataCount = 0;
+ bufferQueue.pop_back();
+ return buff;
+}
+
/*
* We keep on reading as long as we have something to read and a buffer to put
* it in
@@ -149,19 +183,19 @@ void AsynchIO::readable(DispatchHandle& h) {
// (Try to) get a buffer
if (!bufferQueue.empty()) {
// Read into buffer
- Buffer* buff = bufferQueue.back();
- bufferQueue.pop_back();
+ Buffer* buff = bufferQueue.front();
+ bufferQueue.pop_front();
errno = 0;
- int rc = ::read(h.getFD(), buff->bytes, buff->byteCount);
+ int readCount = buff->byteCount-buff->dataCount;
+ int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
if (rc == 0) {
eofCallback(*this);
h.unwatchRead();
return;
} else if (rc > 0) {
- buff->dataStart = 0;
- buff->dataCount = rc;
+ buff->dataCount += rc;
readCallback(*this, buff);
- if (rc != buff->byteCount) {
+ if (rc != readCount) {
// If we didn't fill the read buffer then time to stop reading
return;
}
@@ -209,7 +243,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
// If we didn't write full buffer put rest back
if (rc != buff->dataCount) {
@@ -238,12 +272,17 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
}
} else {
+ // If we're waiting to close the socket then can do it now as there is nothing to write
+ if (queuedClose) {
+ close(h);
+ return;
+ }
// Fd is writable, but nothing to write
if (idleCallback) {
idleCallback(*this);
}
// If we still have no buffers to write we can't do anything more
- if (writeQueue.empty()) {
+ if (writeQueue.empty() && !queuedClose) {
h.unwatchWrite();
return;
}
@@ -252,8 +291,25 @@ void AsynchIO::writeable(DispatchHandle& h) {
}
void AsynchIO::disconnected(DispatchHandle& h) {
+ // If we've already queued close do it before callback
+ if (queuedClose) {
+ close(h);
+ }
+
if (disCallback) {
disCallback(*this);
h.unwatch();
}
}
+
+/*
+ * Close the socket and callback to say we've done it
+ */
+void AsynchIO::close(DispatchHandle& h) {
+ h.stopWatch();
+ h.getSocket().close();
+ if (closedCallback) {
+ closedCallback(*this, getSocket());
+ }
+}
+