summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix/AsynchIO.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-06-29 18:56:11 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-06-29 18:56:11 +0000
commita258bbcaf007d20d943669b7ee6223016dd08d66 (patch)
tree0ea6092222386ebed2fb92c1d809d4eb7fa37557 /cpp/src/qpid/sys/posix/AsynchIO.cpp
parentf830f28dca100d70631af25e082000cf7aed540d (diff)
downloadqpid-python-a258bbcaf007d20d943669b7ee6223016dd08d66.tar.gz
* More work on asychronous network IO
* Fix of current EventQueue code to carry on compiling git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@552001 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix/AsynchIO.cpp')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp195
1 files changed, 195 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
new file mode 100644
index 0000000000..400c2080b2
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AsynchIO.h"
+
+#include "check.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <errno.h>
+
+#include <boost/bind.hpp>
+
+using namespace qpid::sys;
+
+namespace {
+
+/*
+ * Make file descriptor non-blocking
+ */
+void nonblocking(int fd) {
+ QPID_POSIX_CHECK(::fcntl(fd, F_SETFL, O_NONBLOCK));
+}
+
+}
+
+/*
+ * Asynch Acceptor
+ */
+
+AsynchAcceptor::AsynchAcceptor(int fd, Callback callback) :
+ acceptedCallback(callback),
+ handle(fd, boost::bind(&AsynchAcceptor::readable, this, _1), 0) {
+
+ nonblocking(fd);
+}
+
+void AsynchAcceptor::start(Poller::shared_ptr poller) {
+ handle.watch(poller);
+}
+
+/*
+ * We keep on accepting as long as there is something to accept
+ */
+void AsynchAcceptor::readable(DispatchHandle& h) {
+ int afd;
+ do {
+ errno = 0;
+ // TODO: Currently we ignore the peers address, perhaps we should
+ // log it or use it for connection acceptance.
+ afd = ::accept(h.getFD(), 0, 0);
+ if (afd >= 0) {
+ acceptedCallback(afd);
+ } else if (errno == EAGAIN) {
+ break;
+ } else {
+ QPID_POSIX_CHECK(afd);
+ }
+ } while (true);
+
+ h.rewatch();
+}
+
+/*
+ * Asynch reader/writer
+ */
+AsynchIO::AsynchIO(int fd, ReadCallback rCb, EofCallback eofCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
+ readCallback(rCb),
+ eofCallback(eofCb),
+ emptyCallback(eCb),
+ idleCallback(iCb),
+ handle(fd, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1)) {
+
+ nonblocking(fd);
+}
+
+void AsynchIO::start(Poller::shared_ptr poller) {
+ handle.watch(poller);
+}
+
+void AsynchIO::QueueReadBuffer(const Buffer& buff) {
+ bufferQueue.push_front(buff);
+ handle.rewatchRead();
+}
+
+void AsynchIO::QueueWrite(const Buffer& buff) {
+ writeQueue.push_front(buff);
+ handle.rewatchWrite();
+}
+
+/*
+ * We keep on reading as long as we have something to read and a buffer to put
+ * it in
+ */
+void AsynchIO::readable(DispatchHandle& h) {
+ do {
+ // (Try to) get a buffer
+ if (!bufferQueue.empty()) {
+ // Read into buffer
+ Buffer buff = bufferQueue.back();
+ bufferQueue.pop_back();
+ errno = 0;
+ int rc = ::read(h.getFD(), buff.bytes, buff.byteCount);
+ if (rc == 0) {
+ eofCallback();
+ } else if (rc > 0) {
+ readCallback(buff, rc);
+ } else {
+ // Put buffer back
+ bufferQueue.push_back(buff);
+
+ if (errno == EAGAIN) {
+ // We must have just put a buffer back so we know
+ // we can do this
+ h.rewatchRead();
+ return;
+ } else {
+ QPID_POSIX_CHECK(rc);
+ }
+ }
+ } else {
+ // Something to read but no buffer
+ if (emptyCallback) {
+ emptyCallback();
+ }
+ // If we still have no buffers we can't do anything more
+ if (bufferQueue.empty()) {
+ return;
+ }
+
+ }
+ } while (true);
+}
+
+/*
+ * We carry on writing whilst we have data to write and we can write
+ */
+void AsynchIO::writeable(DispatchHandle& h) {
+ do {
+ // See if we've got something to write
+ if (!writeQueue.empty()) {
+ // Write buffer
+ Buffer buff = writeQueue.back();
+ writeQueue.pop_back();
+ errno = 0;
+ int rc = ::write(h.getFD(), buff.bytes, buff.byteCount);
+ if (rc >= 0) {
+ // Recycle the buffer
+ QueueReadBuffer(buff);
+ } else {
+ // Put buffer back
+ writeQueue.push_back(buff);
+
+ if (errno == EAGAIN) {
+ // We have just put a buffer back so we know
+ // we can do this
+ h.rewatchWrite();
+ return;
+ } else {
+ QPID_POSIX_CHECK(rc);
+ }
+ }
+ } else {
+ // Something to read but no buffer
+ if (idleCallback) {
+ idleCallback(h.getFD());
+ }
+ // If we still have no buffers to write we can't do anything more
+ if (writeQueue.empty()) {
+ return;
+ }
+ }
+ } while (true);
+}
+