diff options
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 96 |
1 files changed, 36 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 1558f292aa..946bf0138d 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -27,9 +27,11 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/Socket.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" @@ -51,21 +53,23 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } -Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +Connector* Connector::create(const std::string& proto, + Poller::shared_ptr p, + framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); if (i==theProtocolRegistry().end()) { throw Exception(QPID_MSG("Unknown protocol: " << proto)); } - return (i->second)(v, s, c); + return (i->second)(p, v, s, c); } void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) @@ -81,7 +85,7 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>) { } -class TCPConnector : public Connector, public sys::Codec, private sys::Runnable +class TCPConnector : public Connector, public sys::Codec { typedef std::deque<framing::AMQFrame> Frames; struct Buff; @@ -93,7 +97,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -104,28 +108,25 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - sys::Socket socket; sys::AsynchIO* aio; std::string identifier; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); - void run(); void handleClosed(); bool closeInternal(); - + bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -142,18 +143,23 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); - public: - TCPConnector(framing::ProtocolVersion pVersion, - const ConnectionSettings&, + TCPConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, + const ConnectionSettings&, ConnectionImpl*); }; +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(p, v, s, c); } struct StaticInit { @@ -163,19 +169,21 @@ namespace { } init; } -TCPConnector::TCPConnector(ProtocolVersion ver, +TCPConnector::TCPConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), aio(0), + poller(p), impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); @@ -197,7 +205,6 @@ void TCPConnector::connect(const std::string& host, int port){ } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); - poller = Poller::shared_ptr(new Poller); aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -214,28 +221,24 @@ void TCPConnector::init(){ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); } bool TCPConnector::closeInternal() { - bool ret; - { Mutex::ScopedLock l(lock); - ret = !closed; + bool ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; + socket.close(); } - receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -285,18 +288,13 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -382,28 +380,6 @@ void TCPConnector::eof(AsynchIO&) { handleClosed(); } -void TCPConnector::run() { - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl.lock(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); - handleClosed(); - } - try { - socket.close(); - } catch (const std::exception&) {} -} - void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |
