diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-01-21 06:17:10 +0000 |
| commit | df3fe9778d87dd256a2d4c08146d86830ac1e8be (patch) | |
| tree | 23f64b58e3ec94c6024368d1b90910db9d711c84 /cpp/src/qpid/client/SslConnector.cpp | |
| parent | 66266d1f34066c5960ae1eb4f28b8c7758cb46c9 (diff) | |
| download | qpid-python-df3fe9778d87dd256a2d4c08146d86830ac1e8be.tar.gz | |
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
- The client threads are initialised (via a singleton) when first
used in a Connection::open()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@901550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 87 |
1 files changed, 28 insertions, 59 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 2b34651fa0..cf6d54d261 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -50,7 +51,7 @@ using boost::format; using boost::str; -class SslConnector : public Connector, private sys::Runnable +class SslConnector : public Connector { struct Buff; @@ -68,27 +69,26 @@ class SslConnector : public Connector, private sys::Runnable framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; - bool joined; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; @@ -96,20 +96,17 @@ class SslConnector : public Connector, private sys::Runnable framing::OutputHandler* output; Writer writer; - - sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; ~SslConnector(); - void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -117,8 +114,6 @@ class SslConnector : public Connector, private sys::Runnable std::string identifier; - ConnectionImpl* impl; - void connect(const std::string& host, int port); void init(); void close(); @@ -133,15 +128,20 @@ class SslConnector : public Connector, private sys::Runnable unsigned int getSSF() { return socket.getKeyLen(); } public: - SslConnector(framing::ProtocolVersion pVersion, + SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(p, v, s, c); } struct StaticInit { @@ -150,9 +150,9 @@ namespace { SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -164,18 +164,18 @@ namespace { } init; } -SslConnector::SslConnector(ProtocolVersion ver, +SslConnector::SslConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), version(ver), initiated(false), closed(true), - joined(true), shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - impl(cimpl) + poller(p) { QPID_LOG(debug, "SslConnector created for " << version.toString()); //TODO: how do we want to handle socket configuration with ssl? @@ -198,7 +198,6 @@ void SslConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; - poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -211,11 +210,12 @@ void SslConnector::connect(const std::string& host, int port){ void SslConnector::init(){ Mutex::ScopedLock l(closedLock); - assert(joined); ProtocolInitiation init(version); writeDataBlock(init); - joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); } bool SslConnector::closeInternal() { @@ -224,16 +224,11 @@ bool SslConnector::closeInternal() { if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (!joined && receiver.id() != Thread::current().id()) { - joined = true; - Mutex::ScopedUnlock u(closedLock); - receiver.join(); + socket.close(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -267,11 +262,6 @@ void SslConnector::handleClosed() { shutdownHandler->shutdown(); } -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -376,25 +366,4 @@ void SslConnector::eof(SslIO&) { handleClosed(); } -void SslConnector::run(){ - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - socket.close(); - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - handleClosed(); - } -} - - }} // namespace qpid::client |
