From 4737a65543783790cd1bd3be66ef66956746e12e Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 28 May 2009 05:18:09 +0000 Subject: QPID-1879 Don't use a thread for every new client Connection - By default the max number of threads now used for network io is the number of cpus available. - This can be overridden with the QPID_MAX_IOTHREADS environment variable or the config file git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@779435 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ConnectionImpl.cpp | 92 ++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 4 deletions(-) (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp') diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 6639f92324..ccaa8c0b87 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,13 @@ * under the License. * */ + #include "ConnectionImpl.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -28,11 +34,16 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include #include +#include #include +#include namespace qpid { namespace client { @@ -42,7 +53,10 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock l(timerInitLock); @@ -51,6 +65,73 @@ Timer& theTimer() { return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; iclose(); + io.sub(); } void ConnectionImpl::addSession(const boost::shared_ptr& session, uint16_t channel) @@ -126,7 +210,6 @@ bool ConnectionImpl::isOpen() const return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -134,7 +217,8 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + io.add(); + connector.reset(Connector::create(protocol, io.poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); @@ -238,7 +322,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() { return handler; } - + std::vector ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } -- cgit v1.2.1