From df3fe9778d87dd256a2d4c08146d86830ac1e8be Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 21 Jan 2010 06:17:10 +0000 Subject: QPID-1879 Don't use a thread for every new client Connection - By default the max number of threads now used for network io is the number of cpus available. - This can be overridden with the QPID_MAX_IOTHREADS environment variable or the config file - The client threads are initialised (via a singleton) when first used in a Connection::open() git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901550 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ConnectionImpl.cpp | 95 ++++++++++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 4 deletions(-) (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp') diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index cede7f7310..f348493fd0 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "qpid/client/ConnectionImpl.h" + #include "qpid/client/Connector.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/client/SessionImpl.h" @@ -27,11 +29,20 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include #include +#include #include +#include + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif namespace qpid { namespace client { @@ -41,7 +52,10 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock l(timerInitLock); @@ -50,6 +64,76 @@ Timer& theTimer() { return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; iclose(); + theIO().sub(); } void ConnectionImpl::addSession(const boost::shared_ptr& session, uint16_t channel) @@ -131,11 +218,10 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) } bool ConnectionImpl::isOpen() const -{ +{ return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -143,7 +229,8 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + theIO().add(); + connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); -- cgit v1.2.1