diff options
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 82 |
1 files changed, 29 insertions, 53 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index 7b0bcc6f1e..8194371b8a 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -50,7 +51,7 @@ using boost::format; using boost::str; -class SslConnector : public Connector, private sys::Runnable +class SslConnector : public Connector { struct Buff; @@ -68,25 +69,25 @@ class SslConnector : public Connector, private sys::Runnable framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; @@ -96,20 +97,17 @@ class SslConnector : public Connector, private sys::Runnable framing::OutputHandler* output; Writer writer; - - sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; ~SslConnector(); - void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -118,7 +116,7 @@ class SslConnector : public Connector, private sys::Runnable std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -132,15 +130,20 @@ class SslConnector : public Connector, private sys::Runnable const std::string& getIdentifier() const; public: - SslConnector(framing::ProtocolVersion pVersion, + SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(p, v, s, c); } struct StaticInit { @@ -149,9 +152,9 @@ namespace { SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -163,7 +166,8 @@ namespace { } init; } -SslConnector::SslConnector(ProtocolVersion ver, +SslConnector::SslConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -174,6 +178,7 @@ SslConnector::SslConnector(ProtocolVersion ver, shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), + poller(p), impl(cimpl) { QPID_LOG(debug, "SslConnector created for " << version.toString()); @@ -197,7 +202,6 @@ void SslConnector::connect(const std::string& host, int port){ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; - poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -214,7 +218,10 @@ void SslConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); } bool SslConnector::closeInternal() { @@ -223,16 +230,11 @@ bool SslConnector::closeInternal() { if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (!joined && receiver.id() != Thread::current().id()) { - joined = true; - Mutex::ScopedUnlock u(closedLock); - receiver.join(); + socket.close(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -266,11 +268,6 @@ void SslConnector::handleClosed() { shutdownHandler->shutdown(); } -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -375,25 +372,4 @@ void SslConnector::eof(SslIO&) { handleClosed(); } -void SslConnector::run(){ - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - socket.close(); - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - handleClosed(); - } -} - - }} // namespace qpid::client |
