diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-05-28 05:18:09 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-05-28 05:18:09 +0000 |
| commit | 4737a65543783790cd1bd3be66ef66956746e12e (patch) | |
| tree | 45b9f7f2469255fddfa74822fc2bc2ef8c678d1e /cpp/src/qpid/client/RdmaConnector.cpp | |
| parent | 0a6cb55f7a9365e880587f87befabb996262f8d2 (diff) | |
| download | qpid-python-4737a65543783790cd1bd3be66ef66956746e12e.tar.gz | |
QPID-1879 Don't use a thread for every new client Connection
- By default the max number of threads now used for network io
is the number of cpus available.
- This can be overridden with the QPID_MAX_IOTHREADS environment
variable or the config file
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@779435 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/RdmaConnector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 61 |
1 files changed, 14 insertions, 47 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index ad85104f3a..f6bedf63f5 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -48,7 +49,7 @@ using namespace qpid::framing; using boost::format; using boost::str; - class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable + class RdmaConnector : public Connector, public sys::Codec { struct Buff; @@ -60,13 +61,12 @@ using boost::str; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; - Bounds* bounds; - - + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; + sys::Mutex pollingLock; bool polling; bool joined; @@ -75,15 +75,12 @@ using boost::str; framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void run(); void handleClosed(); bool closeInternal(); @@ -101,7 +98,7 @@ using boost::str; std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -119,15 +116,16 @@ using boost::str; bool canEncode(); public: - RdmaConnector(framing::ProtocolVersion pVersion, + RdmaConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(p, v, s, c); } struct StaticInit { @@ -139,7 +137,8 @@ namespace { } -RdmaConnector::RdmaConnector(ProtocolVersion ver, +RdmaConnector::RdmaConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -152,6 +151,7 @@ RdmaConnector::RdmaConnector(ProtocolVersion ver, joined(true), shutdownHandler(0), aio(0), + poller(p), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -165,7 +165,6 @@ void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); assert(joined); - poller = Poller::shared_ptr(new Poller); // This stuff needs to abstracted out of here to a platform specific file ::addrinfo *res; @@ -190,7 +189,6 @@ void RdmaConnector::connect(const std::string& host, int port){ polling = true; joined = false; - receiver = Thread(this); } // The following only gets run when connected @@ -226,23 +224,14 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv bool RdmaConnector::closeInternal() { bool ret; - { Mutex::ScopedLock l(pollingLock); ret = polling; if (polling) { polling = false; - poller->shutdown(); } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; - } - - receiver.join(); return ret; } - + void RdmaConnector::close() { closeInternal(); } @@ -366,28 +355,6 @@ void RdmaConnector::eof(Rdma::AsynchIO&) { handleClosed(); } -void RdmaConnector::run(){ - // Keep the connection impl in memory until run() completes. - //GRS: currently the ConnectionImpls destructor is where the Io thread is joined - //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - //assert(protect); - try { - Dispatcher d(poller); - - //aio->start(poller); - d.run(); - //aio->queueForDeletion(); - } catch (const std::exception& e) { - { - // We're no longer polling - Mutex::ScopedLock l(pollingLock); - polling = false; - } - QPID_LOG(error, e.what()); - handleClosed(); - } -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |
