diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-15 15:41:21 +0000 |
| commit | dd53b33c3badd538d2d25a35146d9ab032573cc0 (patch) | |
| tree | 305a9f3e6cdc5d88d6c78638c75dda9d3ddb9831 /cpp/src/qpid/sys/posix | |
| parent | 8ac8e19e4805e78c3adcab66f1aab2ef5190f48e (diff) | |
| download | qpid-python-dd53b33c3badd538d2d25a35146d9ab032573cc0.tar.gz | |
Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle.
This means that you can define new classes derived from IOHandle (other than Socket) that
can also be added to a Poller and waited for.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/posix')
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/IOHandle.cpp | 42 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/PrivatePosix.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 51 |
4 files changed, 75 insertions, 46 deletions
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 94c68bd5d0..cedad5c011 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -65,7 +65,8 @@ __thread int64_t threadMaxReadTimeNs = 2 * 1000000; // start at 2ms AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0) { + handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + socket(s) { s.setNonblocking(); ignoreSigpipe(); @@ -84,7 +85,7 @@ void AsynchAcceptor::readable(DispatchHandle& h) { errno = 0; // TODO: Currently we ignore the peers address, perhaps we should // log it or use it for connection acceptance. - s = h.getSocket().accept(0, 0); + s = socket.accept(0, 0); if (s) { acceptedCallback(*s); } else { @@ -112,6 +113,7 @@ AsynchIO::AsynchIO(const Socket& s, closedCallback(cCb), emptyCallback(eCb), idleCallback(iCb), + socket(s), queuedClose(false), writePending(false) { @@ -209,7 +211,7 @@ void AsynchIO::readable(DispatchHandle& h) { bufferQueue.pop_front(); errno = 0; int readCount = buff->byteCount-buff->dataCount; - int rc = h.getSocket().read(buff->bytes + buff->dataCount, readCount); + int rc = socket.read(buff->bytes + buff->dataCount, readCount); if (rc > 0) { buff->dataCount += rc; threadReadTotal += rc; @@ -276,7 +278,7 @@ void AsynchIO::writeable(DispatchHandle& h) { writeQueue.pop_back(); errno = 0; assert(buff->dataStart+buff->dataCount <= buff->byteCount); - int rc = h.getSocket().write(buff->bytes+buff->dataStart, buff->dataCount); + int rc = socket.write(buff->bytes+buff->dataStart, buff->dataCount); if (rc >= 0) { threadWriteTotal += rc; writeTotal += rc; @@ -356,9 +358,9 @@ void AsynchIO::disconnected(DispatchHandle& h) { */ void AsynchIO::close(DispatchHandle& h) { h.stopWatch(); - h.getSocket().close(); + socket.close(); if (closedCallback) { - closedCallback(*this, getSocket()); + closedCallback(*this, socket); } } diff --git a/cpp/src/qpid/sys/posix/IOHandle.cpp b/cpp/src/qpid/sys/posix/IOHandle.cpp new file mode 100644 index 0000000000..80b487eadc --- /dev/null +++ b/cpp/src/qpid/sys/posix/IOHandle.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/IOHandle.h" + +#include "PrivatePosix.h" + +namespace qpid { +namespace sys { + +int toFd(const IOHandlePrivate* h) +{ + return h->fd; +} + +IOHandle::IOHandle(IOHandlePrivate* h) : + impl(h) +{} + +IOHandle::~IOHandle() { + delete impl; +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/PrivatePosix.h b/cpp/src/qpid/sys/posix/PrivatePosix.h index 9ec9770cab..33c0cd81bc 100644 --- a/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -35,9 +35,17 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t); struct timeval& toTimeval(struct timeval& tv, const Duration& t); Duration toTime(const struct timespec& ts); -// Private socket related implementation details -class SocketPrivate; -int toFd(const SocketPrivate* s); +// Private fd related implementation details +class IOHandlePrivate { +public: + IOHandlePrivate(int f = -1) : + fd(f) + {} + + int fd; +}; + +int toFd(const IOHandlePrivate* h); }} diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index c286ebce27..99cf7210b6 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -38,19 +38,8 @@ namespace qpid { namespace sys { -class SocketPrivate { -public: - SocketPrivate(int f = -1) : - fd(f) - {} - - int fd; - - std::string getName(bool local, bool includeService = false) const; - std::string getService(bool local) const; -}; - -std::string SocketPrivate::getName(bool local, bool includeService) const +namespace { +std::string getName(int fd, bool local, bool includeService = false) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -80,7 +69,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const } } -std::string SocketPrivate::getService(bool local) const +std::string getService(int fd, bool local) { ::sockaddr_storage name; // big enough for any socket address ::socklen_t namelen = sizeof(name); @@ -101,21 +90,18 @@ std::string SocketPrivate::getService(bool local) const throw QPID_POSIX_ERROR(rc); return servName; } +} Socket::Socket() : - impl(new SocketPrivate) + IOHandle(new IOHandlePrivate) { createTcp(); } -Socket::Socket(SocketPrivate* sp) : - impl(sp) +Socket::Socket(IOHandlePrivate* h) : + IOHandle(h) {} -Socket::~Socket() { - delete impl; -} - void Socket::createTcp() const { int& socket = impl->fd; @@ -225,7 +211,7 @@ Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const { int afd = ::accept(impl->fd, addr, addrlen); if ( afd >= 0) - return new Socket(new SocketPrivate(afd)); + return new Socket(new IOHandlePrivate(afd)); else if (errno == EAGAIN) return 0; else throw QPID_POSIX_ERROR(errno); @@ -243,41 +229,32 @@ int Socket::write(const void *buf, size_t count) const std::string Socket::getSockname() const { - return impl->getName(true); + return getName(impl->fd, true); } std::string Socket::getPeername() const { - return impl->getName(false); + return getName(impl->fd, false); } std::string Socket::getPeerAddress() const { - return impl->getName(false, true); + return getName(impl->fd, false, true); } std::string Socket::getLocalAddress() const { - return impl->getName(true, true); + return getName(impl->fd, true, true); } uint16_t Socket::getLocalPort() const { - return atoi(impl->getService(true).c_str()); + return atoi(getService(impl->fd, true).c_str()); } uint16_t Socket::getRemotePort() const { - return atoi(impl->getService(true).c_str()); -} - -int Socket::toFd() const { - return impl->fd; -} - -int toFd(const SocketPrivate* s) -{ - return s->fd; + return atoi(getService(impl->fd, true).c_str()); } }} // namespace qpid::sys |
