diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-05-28 05:18:09 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-05-28 05:18:09 +0000 |
| commit | 4737a65543783790cd1bd3be66ef66956746e12e (patch) | |
| tree | 45b9f7f2469255fddfa74822fc2bc2ef8c678d1e /cpp/src/qpid/client/ConnectionImpl.cpp | |
| parent | 0a6cb55f7a9365e880587f87befabb996262f8d2 (diff) | |
| download | qpid-python-4737a65543783790cd1bd3be66ef66956746e12e.tar.gz | |
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@779435 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ConnectionImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 92 |
1 files changed, 88 insertions, 4 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 6639f92324..ccaa8c0b87 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,7 +18,13 @@ * under the License. * */ + #include "ConnectionImpl.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -28,11 +34,16 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/shared_ptr.hpp> #include <limits> +#include <vector> namespace qpid { namespace client { @@ -42,7 +53,10 @@ using namespace qpid::framing::connection; using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -51,6 +65,73 @@ Timer& theTimer() { return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector<Thread> t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock<Mutex> l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock<Mutex> l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; i<ioThreads; ++i) { + t[i].join(); + } + } +}; + +static IOThread io(SystemInfo::concurrency()); + class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -67,6 +148,8 @@ public: {} }; +} + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -90,6 +173,7 @@ ConnectionImpl::~ConnectionImpl() { // is running. failover.reset(); if (connector) connector->close(); + io.sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -126,7 +210,6 @@ bool ConnectionImpl::isOpen() const return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -134,7 +217,8 @@ void ConnectionImpl::open() int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + io.add(); + connector.reset(Connector::create(protocol, io.poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); @@ -238,7 +322,7 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() { return handler; } - + std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } |
