diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 13 |
4 files changed, 23 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index a476f2d880..ee543e20d2 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -23,7 +23,6 @@ */ #include <map> #include <string> -#include "ConnectionImpl.h" #include "qpid/client/Session.h" namespace qpid { diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 6dca4dcf21..f32e21c389 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -19,6 +19,7 @@ * */ #include "ConnectionImpl.h" +#include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -38,7 +39,7 @@ using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - connector(v, settings, this), + connector(new Connector(v, settings, this)), version(v), isClosed(true),//closed until successfully opened isClosing(false) @@ -48,9 +49,9 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, NORMAL, std::string()); - connector.setInputHandler(&handler); - connector.setTimeoutHandler(this); - connector.setShutdownHandler(this); + connector->setInputHandler(&handler); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); @@ -60,7 +61,7 @@ ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - connector.close(); + connector->close(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) @@ -97,8 +98,8 @@ bool ConnectionImpl::isOpen() const void ConnectionImpl::open(const std::string& host, int port) { QPID_LOG(info, "Connecting to " << host << ":" << port); - connector.connect(host, port); - connector.init(); + connector->connect(host, port); + connector->init(); handler.waitForOpen(); Mutex::ScopedLock l(lock); isClosed = false; @@ -112,7 +113,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { AMQFrame frame(in_place<AMQHeartbeatBody>()); - connector.send(frame); + connector->send(frame); } void ConnectionImpl::close() @@ -130,8 +131,8 @@ void ConnectionImpl::close() template <class F> void ConnectionImpl::closeInternal(const F& f) { isClosed = true; - connector.close(); - for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) { + connector->close(); + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); if (s) f(s); } diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index b02dda5af7..98fb212c3e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -24,7 +24,6 @@ #include "Bounds.h" #include "ConnectionHandler.h" -#include "Connector.h" #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" @@ -33,11 +32,13 @@ #include <map> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <boost/enable_shared_from_this.hpp> namespace qpid { namespace client { +class Connector; class ConnectionSettings; class SessionImpl; @@ -52,7 +53,7 @@ class ConnectionImpl : public Bounds, SessionMap sessions; ConnectionHandler handler; - Connector connector; + boost::scoped_ptr<Connector> connector; framing::ProtocolVersion version; sys::Mutex lock; bool isClosed; diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index b35e77c726..c7f5be0936 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -34,13 +34,18 @@ #include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" -#include "qpid/sys/AsynchIO.h" #include <queue> #include <boost/weak_ptr.hpp> #include <boost/shared_ptr.hpp> namespace qpid { + +namespace sys { +class Poller; +class AsynchIO; +class AsynchIOBufferBase; +} namespace client { @@ -56,7 +61,7 @@ class Connector : public framing::OutputHandler, /** Batch up frames for writing to aio. */ class Writer : public framing::FrameHandler { - typedef sys::AsynchIO::BufferBase BufferBase; + typedef sys::AsynchIOBufferBase BufferBase; typedef std::vector<framing::AMQFrame> Frames; const uint16_t maxFrameSize; @@ -109,7 +114,7 @@ class Connector : public framing::OutputHandler, sys::Socket socket; sys::AsynchIO* aio; - sys::Poller::shared_ptr poller; + boost::shared_ptr<sys::Poller> poller; void checkIdle(ssize_t status); void setSocketTimeout(); @@ -118,7 +123,7 @@ class Connector : public framing::OutputHandler, void handleClosed(); bool closeInternal(); - void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*); + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); |
