diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/client/SslConnector.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SslConnector.cpp')
-rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 106 |
1 files changed, 72 insertions, 34 deletions
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index c2081a88f2..11707eb3f7 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -30,8 +30,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" -#include "qpid/sys/ssl/SslIo.h" +#include "qpid/sys/AsynchIO.h" #include "qpid/sys/ssl/SslSocket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/sys/SecuritySettings.h" @@ -72,23 +73,28 @@ class SslConnector : public Connector sys::ssl::SslSocket socket; - sys::ssl::SslIO* aio; + sys::AsynchConnector* connector; + sys::AsynchIO* aio; std::string identifier; Poller::shared_ptr poller; SecuritySettings securitySettings; ~SslConnector(); - void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); - void writebuff(qpid::sys::ssl::SslIO&); + void readbuff(AsynchIO&, AsynchIOBufferBase*); + void writebuff(AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); - void eof(qpid::sys::ssl::SslIO&); - void disconnected(qpid::sys::ssl::SslIO&); + void eof(AsynchIO&); + void disconnected(AsynchIO&); void connect(const std::string& host, const std::string& port); + void connected(const sys::Socket&); + void connectFailed(const std::string& msg); + void close(); void send(framing::AMQFrame& frame); - void abort() {} // TODO: Need to fix for heartbeat timeouts to work + void abort(); + void connectAborted(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); @@ -96,10 +102,10 @@ class SslConnector : public Connector framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; const SecuritySettings* getSecuritySettings(); - void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&); + void socketClosed(AsynchIO&, const Socket&); size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); + size_t encode(char* buffer, size_t size); bool canEncode(); public: @@ -164,32 +170,46 @@ SslConnector::~SslConnector() { close(); } -void SslConnector::connect(const std::string& host, const std::string& port){ +void SslConnector::connect(const std::string& host, const std::string& port) { Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& e) { - socket.close(); - throw TransportFailure(e.what()); - } - + connector = AsynchConnector::create( + socket, + host, port, + boost::bind(&SslConnector::connected, this, _1), + boost::bind(&SslConnector::connectFailed, this, _3)); closed = false; - aio = new SslIO(socket, - boost::bind(&SslConnector::readbuff, this, _1, _2), - boost::bind(&SslConnector::eof, this, _1), - boost::bind(&SslConnector::disconnected, this, _1), - boost::bind(&SslConnector::socketClosed, this, _1, _2), - 0, // nobuffs - boost::bind(&SslConnector::writebuff, this, _1)); + + connector->start(poller); +} + +void SslConnector::connected(const Socket&) { + connector = 0; + aio = AsynchIO::create(socket, + boost::bind(&SslConnector::readbuff, this, _1, _2), + boost::bind(&SslConnector::eof, this, _1), + boost::bind(&SslConnector::disconnected, this, _1), + boost::bind(&SslConnector::socketClosed, this, _1, _2), + 0, // nobuffs + boost::bind(&SslConnector::writebuff, this, _1)); aio->createBuffers(maxFrameSize); - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + identifier = str(format("[%1%]") % socket.getFullAddress()); ProtocolInitiation init(version); writeDataBlock(init); aio->start(poller); } +void SslConnector::connectFailed(const std::string& msg) { + connector = 0; + QPID_LOG(warning, "Connect failed: " << msg); + socket.close(); + if (!closed) + closed = true; + if (shutdownHandler) + shutdownHandler->shutdown(); +} + void SslConnector::close() { Mutex::ScopedLock l(lock); if (!closed) { @@ -199,13 +219,31 @@ void SslConnector::close() { } } -void SslConnector::socketClosed(SslIO&, const SslSocket&) { +void SslConnector::socketClosed(AsynchIO&, const Socket&) { if (aio) aio->queueForDeletion(); if (shutdownHandler) shutdownHandler->shutdown(); } +void SslConnector::connectAborted() { + connector->stop(); + connectFailed("Connection timedout"); +} + +void SslConnector::abort() { + // Can't abort a closed connection + if (!closed) { + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&SslConnector::eof, this, _1)); + } else if (connector) { + // We're still connecting + connector->requestCallback(boost::bind(&SslConnector::connectAborted, this)); + } + } +} + void SslConnector::setInputHandler(InputHandler* handler){ input = handler; } @@ -255,7 +293,7 @@ void SslConnector::send(AMQFrame& frame) { } } -void SslConnector::writebuff(SslIO& /*aio*/) +void SslConnector::writebuff(AsynchIO& /*aio*/) { // It's possible to be disconnected and be writable if (closed) @@ -265,7 +303,7 @@ void SslConnector::writebuff(SslIO& /*aio*/) return; } - SslIO::BufferBase* buffer = aio->getQueuedBuffer(); + AsynchIOBufferBase* buffer = aio->getQueuedBuffer(); if (buffer) { size_t encoded = encode(buffer->bytes, buffer->byteCount); @@ -285,9 +323,9 @@ bool SslConnector::canEncode() } // Called in IO thread. -size_t SslConnector::encode(const char* buffer, size_t size) +size_t SslConnector::encode(char* buffer, size_t size) { - framing::Buffer out(const_cast<char*>(buffer), size); + framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); @@ -304,7 +342,7 @@ size_t SslConnector::encode(const char* buffer, size_t size) return bytesWritten; } -void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) +void SslConnector::readbuff(AsynchIO& aio, AsynchIOBufferBase* buff) { int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount); // TODO: unreading needs to go away, and when we can cope @@ -343,7 +381,7 @@ size_t SslConnector::decode(const char* buffer, size_t size) } void SslConnector::writeDataBlock(const AMQDataBlock& data) { - SslIO::BufferBase* buff = aio->getQueuedBuffer(); + AsynchIOBufferBase* buff = aio->getQueuedBuffer(); assert(buff); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); @@ -351,11 +389,11 @@ void SslConnector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void SslConnector::eof(SslIO&) { +void SslConnector::eof(AsynchIO&) { close(); } -void SslConnector::disconnected(SslIO&) { +void SslConnector::disconnected(AsynchIO&) { close(); socketClosed(*aio, socket); } |