diff options
Diffstat (limited to 'cpp/src/qpid/client/RdmaConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 61 |
1 files changed, 47 insertions, 14 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index f6bedf63f5..ad85104f3a 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -26,7 +26,6 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -49,7 +48,7 @@ using namespace qpid::framing; using boost::format; using boost::str; - class RdmaConnector : public Connector, public sys::Codec + class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable { struct Buff; @@ -61,12 +60,13 @@ using boost::str; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; - Bounds* bounds; - + Bounds* bounds; + + framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; + sys::Mutex pollingLock; bool polling; bool joined; @@ -75,12 +75,15 @@ using boost::str; framing::InitiationHandler* initialiser; framing::OutputHandler* output; + sys::Thread receiver; + Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); + void run(); void handleClosed(); bool closeInternal(); @@ -98,7 +101,7 @@ using boost::str; std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -116,16 +119,15 @@ using boost::str; bool canEncode(); public: - RdmaConnector(Poller::shared_ptr, - framing::ProtocolVersion pVersion, + RdmaConnector(framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { - Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(p, v, s, c); + Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(v, s, c); } struct StaticInit { @@ -137,8 +139,7 @@ namespace { } -RdmaConnector::RdmaConnector(Poller::shared_ptr p, - ProtocolVersion ver, +RdmaConnector::RdmaConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -151,7 +152,6 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, joined(true), shutdownHandler(0), aio(0), - poller(p), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -165,6 +165,7 @@ void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); assert(joined); + poller = Poller::shared_ptr(new Poller); // This stuff needs to abstracted out of here to a platform specific file ::addrinfo *res; @@ -189,6 +190,7 @@ void RdmaConnector::connect(const std::string& host, int port){ polling = true; joined = false; + receiver = Thread(this); } // The following only gets run when connected @@ -224,14 +226,23 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv bool RdmaConnector::closeInternal() { bool ret; + { Mutex::ScopedLock l(pollingLock); ret = polling; if (polling) { polling = false; + poller->shutdown(); } + if (joined || receiver.id() == Thread::current().id()) { + return ret; + } + joined = true; + } + + receiver.join(); return ret; } - + void RdmaConnector::close() { closeInternal(); } @@ -355,6 +366,28 @@ void RdmaConnector::eof(Rdma::AsynchIO&) { handleClosed(); } +void RdmaConnector::run(){ + // Keep the connection impl in memory until run() completes. + //GRS: currently the ConnectionImpls destructor is where the Io thread is joined + //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); + //assert(protect); + try { + Dispatcher d(poller); + + //aio->start(poller); + d.run(); + //aio->queueForDeletion(); + } catch (const std::exception& e) { + { + // We're no longer polling + Mutex::ScopedLock l(pollingLock); + polling = false; + } + QPID_LOG(error, e.what()); + handleClosed(); + } +} + void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |
