diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
| commit | 65ea2f177bd0810590895d89a490af8cea60253b (patch) | |
| tree | 1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06 /cpp/src/qpid/sys/posix/AsynchIO.cpp | |
| parent | 0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff) | |
| download | qpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz | |
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform
code (Socker & Poller), this is not 100% true at present, but should be simple
to finish.
- This is still not the default (use "./configure --disable-apr-netio" to get it)
- Interrupting the broker gives a known error
- Default for number of broker io threads is not correct (needs to be number of CPUs -
it will run slower with too many io threads)
* EventChannel code
- Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-)
* Rearranged the platform Socket implementations a bit for better abstraction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 122 |
1 files changed, 89 insertions, 33 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 473ef7936f..2b462cbd7a 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -24,8 +24,6 @@ #include "check.h" #include <unistd.h> -#include <fcntl.h> -#include <sys/types.h> #include <sys/socket.h> #include <signal.h> #include <errno.h> @@ -37,13 +35,6 @@ using namespace qpid::sys; namespace { /* - * Make file descriptor non-blocking - */ -void nonblocking(int fd) { - QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK)); -} - -/* * Make *process* not generate SIGPIPE when writing to closed * pipe/socket (necessary as default action is to terminate process) */ @@ -57,11 +48,11 @@ void ignoreSigpipe() { * Asynch Acceptor */ -AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) : +AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { - nonblocking(fd); + s.setNonblocking(); ignoreSigpipe(); } @@ -73,18 +64,16 @@ void AsynchAcceptor::start(Poller::shared_ptr poller) { * We keep on accepting as long as there is something to accept */ void AsynchAcceptor::readable(DispatchHandle& h) { - int afd; + Socket* s; do { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. - afd = ::accept(h.getFD(), 0, 0); - if (afd >= 0) { - acceptedCallback(afd); - } else if (errno == EAGAIN) { - break; + s = h.getSocket().accept(0, 0); + if (s) { + acceptedCallback(*s); } else { - QPID_POSIX_CHECK(afd); + break; } } while (true); @@ -94,21 +83,23 @@ void AsynchAcceptor::readable(DispatchHandle& h) { /* * Asynch reader/writer */ -AsynchIO::AsynchIO(int fd, +AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, - BuffersEmptyCallback eCb, IdleCallback iCb) : + ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - DispatchHandle(fd, + DispatchHandle(s, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1), boost::bind(&AsynchIO::disconnected, this, _1)), readCallback(rCb), eofCallback(eofCb), disCallback(disCb), + closedCallback(cCb), emptyCallback(eCb), - idleCallback(iCb) { + idleCallback(iCb), + queuedClose(false) { - nonblocking(fd); + s.setNonblocking(); } struct deleter @@ -131,15 +122,58 @@ void AsynchIO::start(Poller::shared_ptr poller) { } void AsynchIO::queueReadBuffer(Buffer* buff) { + assert(buff); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.push_back(buff); + DispatchHandle::rewatchRead(); +} + +void AsynchIO::unread(Buffer* buff) { + assert(buff); + if (buff->dataStart != 0) { + memmove(buff->bytes, buff->bytes+buff->dataStart, buff->dataCount); + buff->dataStart = 0; + } bufferQueue.push_front(buff); DispatchHandle::rewatchRead(); } +// Either queue for writing or announce that there is something to write +// and we should ask for it void AsynchIO::queueWrite(Buffer* buff) { - writeQueue.push_front(buff); + // If no buffer then don't queue anything + // (but still wake up for writing) + if (buff) { + // If we've already closed the socket then throw the write away + if (queuedClose) { + bufferQueue.push_front(buff); + return; + } else { + writeQueue.push_front(buff); + } + } DispatchHandle::rewatchWrite(); } +void AsynchIO::queueWriteClose() { + queuedClose = true; +} + +/** Return a queued buffer if there are enough + * to spare + */ +AsynchIO::Buffer* AsynchIO::getQueuedBuffer() { + // Always keep at least one buffer (it might have data that was "unread" in it) + if (bufferQueue.size()<=1) + return 0; + Buffer* buff = bufferQueue.back(); + buff->dataStart = 0; + buff->dataCount = 0; + bufferQueue.pop_back(); + return buff; +} + /* * We keep on reading as long as we have something to read and a buffer to put * it in @@ -149,19 +183,19 @@ void AsynchIO::readable(DispatchHandle& h) { // (Try to) get a buffer if (!bufferQueue.empty()) { // Read into buffer - Buffer* buff = bufferQueue.back(); - bufferQueue.pop_back(); + Buffer* buff = bufferQueue.front(); + bufferQueue.pop_front(); errno = 0; - int rc = ::read(h.getFD(), buff->bytes, buff->byteCount); + int readCount = buff->byteCount-buff->dataCount; + int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); if (rc == 0) { eofCallback(*this); h.unwatchRead(); return; } else if (rc > 0) { - buff->dataStart = 0; - buff->dataCount = rc; + buff->dataCount += rc; readCallback(*this, buff); - if (rc != buff->byteCount) { + if (rc != readCount) { // If we didn't fill the read buffer then time to stop reading return; } @@ -209,7 +243,7 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = ::write(h.getFD(), buff->bytes+buff->dataStart, buff->dataCount); + int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { // If we didn't write full buffer put rest back if (rc != buff->dataCount) { @@ -238,12 +272,17 @@ void AsynchIO::writeable(DispatchHandle& h) { } } } else { + // If we're waiting to close the socket then can do it now as there is nothing to write + if (queuedClose) { + close(h); + return; + } // Fd is writable, but nothing to write if (idleCallback) { idleCallback(*this); } // If we still have no buffers to write we can't do anything more - if (writeQueue.empty()) { + if (writeQueue.empty() && !queuedClose) { h.unwatchWrite(); return; } @@ -252,8 +291,25 @@ void AsynchIO::writeable(DispatchHandle& h) { } void AsynchIO::disconnected(DispatchHandle& h) { + // If we've already queued close do it before callback + if (queuedClose) { + close(h); + } + if (disCallback) { disCallback(*this); h.unwatch(); } } + +/* + * Close the socket and callback to say we've done it + */ +void AsynchIO::close(DispatchHandle& h) { + h.stopWatch(); + h.getSocket().close(); + if (closedCallback) { + closedCallback(*this, getSocket()); + } +} + |
