From f0a31beb7a609591e7b34e60ddfd85e9e183fbc0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 24 Jan 2008 22:26:12 +0000 Subject: Improved/additional client API tests. - Replaced InProcessBroker with a more accurate loopback BrokerFixture. - Added asserts for mutex/condition/thread errors in debug build. - Added client tests for several exception conditions. - Added peer address to log ouput, client/server distinguished by (addr) or [addr] - Fixed various deadlocks & races exposed by the new asserts & tests. File-by-file: New BrokerFixture replaces InProcessBroker D src/tests/InProcessBroker.h M src/tests/BrokerFixture.h M src/tests/SocketProxy.h M src/tests/Makefile.am Made it run a bit faster. M src/tests/quick_perftest Redundant D src/tests/APRBaseTest.cpp Updated tests to use BrokerFixture M src/tests/ClientChannelTest.cpp M src/tests/exception_test.cpp M src/tests/ClientSessionTest.cpp Print thread IDs in decimal, same as GDB. M src/qpid/log/Logger.cpp Assert mutex/condition ops in debug build. M src/qpid/sys/posix/check.h M src/qpid/sys/posix/Mutex.h M src/qpid/sys/posix/Condition.h M src/qpid/sys/posix/Thread.h Added toFd() so SocketProxy can use ::select() M src/qpid/sys/Socket.h M src/qpid/sys/posix/Socket.cpp Fixes for races & deadlocks shown up by new tests & asserts. Mostly shutdown/close issues. M src/qpid/client/ConnectionHandler.h M src/qpid/client/ConnectionImpl.cpp M src/qpid/client/Demux.h M src/qpid/client/SessionCore.cpp M src/qpid/client/ConnectionHandler.cpp M src/qpid/client/Connector.h M src/qpid/client/Demux.cpp M src/qpid/client/Dispatcher.cpp M src/qpid/client/ConnectionImpl.h Logging peer address. M src/qpid/sys/AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@615063 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/SocketProxy.h | 158 +++++++++++++++++++++++++++++++++----------- 1 file changed, 120 insertions(+), 38 deletions(-) (limited to 'cpp/src/tests/SocketProxy.h') diff --git a/cpp/src/tests/SocketProxy.h b/cpp/src/tests/SocketProxy.h index b985ded175..a37c1f2c3e 100644 --- a/cpp/src/tests/SocketProxy.h +++ b/cpp/src/tests/SocketProxy.h @@ -24,59 +24,141 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" + +#include /** - * A simple socket proxy that forwards to another socket. Used between - * client & broker to simulate network failures. + * A simple socket proxy that forwards to another socket. + * Used between client & local broker to simulate network failures. */ -struct SocketProxy : public qpid::sys::Runnable +class SocketProxy : private qpid::sys::Runnable { - int port; // Port bound to server socket. - qpid::sys::Socket client, server; // Client & server sockets. + public: + /** Connect to connectPort on host, start a forwarding thread. + * Listen for connection on getPort(). + */ + SocketProxy(int connectPort, const std::string host="localhost") + : closed(false), port(listener.listen()) + { + int r=::pipe(closePipe); + if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); + client.connect(host, connectPort); + thread = qpid::sys::Thread(static_cast(this)); + } + + ~SocketProxy() { close(); } - SocketProxy(const std::string& host, int port) { init(host,port); } - SocketProxy(int port) { init("localhost",port); } + /** Simulate a network disconnect. */ + void close() { + { + qpid::sys::Mutex::ScopedLock l(lock); + if (closed) return; + closed=true; + } + write(closePipe[1], this, 1); // Random byte to closePipe + thread.join(); + client.close(); + ::close(closePipe[0]); + ::close(closePipe[1]); + } - ~SocketProxy() { client.close(); server.close(); thread.join(); } + bool isClosed() const { + qpid::sys::Mutex::ScopedLock l(lock); + return closed; + } + + uint16_t getPort() const { return port; } private: - - void init(const std::string& host, int connectPort) { - client.connect(host, connectPort); - port = server.listen(); - thread=qpid::sys::Thread(this); + static void throwErrno(const std::string& msg) { + throw qpid::Exception(msg+":"+qpid::strError(errno)); } - - void run() { - try { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); - } catch(...) {} + static void throwIf(bool condition, const std::string& msg) { + if (condition) throw qpid::Exception(msg); } + + struct FdSet : fd_set { + FdSet() : maxFd(0) { clear(); } + void clear() { FD_ZERO(this); } + void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } + bool isSet(int fd) const { return FD_ISSET(fd, this); } + bool operator[](int fd) const { return isSet(fd); } - qpid::sys::Thread thread; - char buffer[64*1024]; -}; + int maxFd; + }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - qpid::client::Session_0_10 session; + enum { RD=1, WR=2, ER=4 }; - ProxyConnection(const std::string& host, int port) : proxy(port) { - open(host, proxy.port); - session=newSession(); - } + struct Selector { + FdSet rd, wr, er; + + void set(int fd, int sets) { + if (sets & RD) rd.set(fd); + if (sets & WR) wr.set(fd); + if (sets & ER) er.set(fd); + } + + int select() { + for (;;) { + int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); + int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); + if (r == -1 && errno == EINTR) continue; + if (r < 0) throwErrno(QPID_MSG("select returned " < server; + try { + // Accept incoming connections, watch closePipe. + Selector accept; + accept.set(listener.toFd(), RD|ER); + accept.set(closePipe[0], RD|ER); + accept.select(); + throwIf(accept.rd[closePipe[0]], "Closed by close()"); + throwIf(!accept.rd[listener.toFd()],"Accept failed"); + server.reset(listener.accept(0, 0)); + + // Pump data between client & server sockets, watch closePipe. + char buffer[1024]; + for (;;) { + Selector select; + select.set(server->toFd(), RD|ER); + select.set(client.toFd(), RD|ER); + select.set(closePipe[0], RD|ER); + select.select(); + throwIf(select.rd[closePipe[0]], "Closed by close()"); + // Read even if fd is in error to throw a useful exception. + bool gotData=false; + if (select.rd[server->toFd()] || select.er[server->toFd()]) { + client.write(buffer, server->read(buffer, sizeof(buffer))); + gotData=true; + } + if (select.rd[client.toFd()] || select.er[client.toFd()]) { + server->write(buffer, client.read(buffer, sizeof(buffer))); + gotData=true; + } + throwIf(!gotData, "No data from select()"); + } + } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + } + if (server.get()) server->close(); + close(); } + + mutable qpid::sys::Mutex lock; + bool closed; + qpid::sys::Socket client, listener; + uint16_t port; + int closePipe[2]; + qpid::sys::Thread thread; }; #endif -- cgit v1.2.1