diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
| commit | df3fe9778d87dd256a2d4c08146d86830ac1e8be (patch) | |
| tree | 23f64b58e3ec94c6024368d1b90910db9d711c84 /cpp/src/qpid/client/TCPConnector.cpp | |
| parent | 66266d1f34066c5960ae1eb4f28b8c7758cb46c9 (diff) | |
| download | qpid-python-df3fe9778d87dd256a2d4c08146d86830ac1e8be.tar.gz | |
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
- The client threads are initialised (via a singleton) when first
used in a Connection::open()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/TCPConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 83 |
1 files changed, 28 insertions, 55 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 9369dd2ef4..2de139d5df 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -27,6 +27,7 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -45,10 +46,15 @@ using namespace qpid::framing; using boost::format; using boost::str; +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(p, v, s, c); } struct StaticInit { @@ -58,25 +64,20 @@ namespace { } init; } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - -TCPConnector::TCPConnector(ProtocolVersion ver, - const ConnectionSettings& settings, - ConnectionImpl* cimpl) +TCPConnector::TCPConnector(Poller::shared_ptr p, + ProtocolVersion ver, + const ConnectionSettings& settings, + ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), - joined(true), shutdownHandler(0), aio(0), - impl(cimpl->shared_from_this()) + poller(p) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); settings.configureSocket(socket); @@ -89,16 +90,13 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port) { Mutex::ScopedLock l(lock); assert(closed); - assert(joined); - poller = Poller::shared_ptr(new Poller); - AsynchConnector* c = - AsynchConnector::create(socket, - host, port, - boost::bind(&TCPConnector::connected, this, _1), - boost::bind(&TCPConnector::connectFailed, this, _3)); + AsynchConnector* c = AsynchConnector::create( + socket, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); closed = false; - joined = false; - receiver = Thread(this); + c->start(poller); } @@ -113,38 +111,31 @@ void TCPConnector::connected(const Socket&) { for (int i = 0; i < 32; i++) { aio->queueReadBuffer(new Buff(maxFrameSize)); } - aio->start(poller); identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); + + aio->start(poller); } void TCPConnector::connectFailed(const std::string& msg) { QPID_LOG(warning, "Connecting failed: " << msg); - closed = true; - poller->shutdown(); - closeInternal(); - if (shutdownHandler) + socket.close(); + if (!closed && shutdownHandler) { + closed = true; shutdownHandler->shutdown(); + } } bool TCPConnector::closeInternal() { - bool ret; - { Mutex::ScopedLock l(lock); - ret = !closed; + bool ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; + socket.close(); } - receiver.join(); return ret; } @@ -301,28 +292,10 @@ void TCPConnector::eof(AsynchIO&) { handleClosed(); } -void TCPConnector::run() { - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl.lock(); - assert(protect); - try { - Dispatcher d(poller); - - d.run(); - } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); - handleClosed(); - } - try { - socket.close(); - } catch (const std::exception&) {} -} - void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; securityLayer->init(this); } - }} // namespace qpid::client |
