summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/posix
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-15 15:41:21 +0000
commitdd53b33c3badd538d2d25a35146d9ab032573cc0 (patch)
tree305a9f3e6cdc5d88d6c78638c75dda9d3ddb9831 /cpp/src/qpid/sys/posix
parent8ac8e19e4805e78c3adcab66f1aab2ef5190f48e (diff)
downloadqpid-python-dd53b33c3badd538d2d25a35146d9ab032573cc0.tar.gz
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that can also be added to a Poller and waited for. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix')
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp14
-rw-r--r--cpp/src/qpid/sys/posix/IOHandle.cpp42
-rw-r--r--cpp/src/qpid/sys/posix/PrivatePosix.h14
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp51
4 files changed, 75 insertions, 46 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 94c68bd5d0..cedad5c011 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) {
+ handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ socket(s) {
s.setNonblocking();
ignoreSigpipe();
@@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) {
errno = 0;
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
- s = h.getSocket().accept(0, 0);
+ s = socket.accept(0, 0);
if (s) {
acceptedCallback(*s);
} else {
@@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s,
closedCallback(cCb),
emptyCallback(eCb),
idleCallback(iCb),
+ socket(s),
queuedClose(false),
writePending(false) {
@@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) {
bufferQueue.pop_front();
errno = 0;
int readCount = buff->byteCount-buff->dataCount;
- int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount);
+ int rc = socket.read(buff->bytes + buff->dataCount, readCount);
if (rc > 0) {
buff->dataCount += rc;
threadReadTotal += rc;
@@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) {
writeQueue.pop_back();
errno = 0;
assert(buff->dataStart+buff->dataCount <= buff->byteCount);
- int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount);
+ int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount);
if (rc >= 0) {
threadWriteTotal += rc;
writeTotal += rc;
@@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) {
*/
void AsynchIO::close(DispatchHandle& h) {
h.stopWatch();
- h.getSocket().close();
+ socket.close();
if (closedCallback) {
- closedCallback(*this, getSocket());
+ closedCallback(*this, socket);
}
}
diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp
new file mode 100644
index 0000000000..80b487eadc
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/IOHandle.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+#include "PrivatePosix.h"
+
+namespace qpid {
+namespace sys {
+
+int toFd(const IOHandlePrivate* h)
+{
+ return h->fd;
+}
+
+IOHandle::IOHandle(IOHandlePrivate* h) :
+ impl(h)
+{}
+
+IOHandle::~IOHandle() {
+ delete impl;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h
index 9ec9770cab..33c0cd81bc 100644
--- a/cpp/src/qpid/sys/posix/PrivatePosix.h
+++ b/cpp/src/qpid/sys/posix/PrivatePosix.h
@@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t);
struct timeval& toTimeval(struct timeval& tv, const Duration& t);
Duration toTime(const struct timespec& ts);
-// Private socket related implementation details
-class SocketPrivate;
-int toFd(const SocketPrivate* s);
+// Private fd related implementation details
+class IOHandlePrivate {
+public:
+ IOHandlePrivate(int f = -1) :
+ fd(f)
+ {}
+
+ int fd;
+};
+
+int toFd(const IOHandlePrivate* h);
}}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index c286ebce27..99cf7210b6 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -38,19 +38,8 @@
namespace qpid {
namespace sys {
-class SocketPrivate {
-public:
- SocketPrivate(int f = -1) :
- fd(f)
- {}
-
- int fd;
-
- std::string getName(bool local, bool includeService = false) const;
- std::string getService(bool local) const;
-};
-
-std::string SocketPrivate::getName(bool local, bool includeService) const
+namespace {
+std::string getName(int fd, bool local, bool includeService = false)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
}
}
-std::string SocketPrivate::getService(bool local) const
+std::string getService(int fd, bool local)
{
::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
@@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const
throw QPID_POSIX_ERROR(rc);
return servName;
}
+}
Socket::Socket() :
- impl(new SocketPrivate)
+ IOHandle(new IOHandlePrivate)
{
createTcp();
}
-Socket::Socket(SocketPrivate* sp) :
- impl(sp)
+Socket::Socket(IOHandlePrivate* h) :
+ IOHandle(h)
{}
-Socket::~Socket() {
- delete impl;
-}
-
void Socket::createTcp() const
{
int& socket = impl->fd;
@@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
{
int afd = ::accept(impl->fd, addr, addrlen);
if ( afd >= 0)
- return new Socket(new SocketPrivate(afd));
+ return new Socket(new IOHandlePrivate(afd));
else if (errno == EAGAIN)
return 0;
else throw QPID_POSIX_ERROR(errno);
@@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const
std::string Socket::getSockname() const
{
- return impl->getName(true);
+ return getName(impl->fd, true);
}
std::string Socket::getPeername() const
{
- return impl->getName(false);
+ return getName(impl->fd, false);
}
std::string Socket::getPeerAddress() const
{
- return impl->getName(false, true);
+ return getName(impl->fd, false, true);
}
std::string Socket::getLocalAddress() const
{
- return impl->getName(true, true);
+ return getName(impl->fd, true, true);
}
uint16_t Socket::getLocalPort() const
{
- return atoi(impl->getService(true).c_str());
+ return atoi(getService(impl->fd, true).c_str());
}
uint16_t Socket::getRemotePort() const
{
- return atoi(impl->getService(true).c_str());
-}
-
-int Socket::toFd() const {
- return impl->fd;
-}
-
-int toFd(const SocketPrivate* s)
-{
- return s->fd;
+ return atoi(getService(impl->fd, true).c_str());
}
}} // namespace qpid::sys