summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp21
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/client/Connector.h13
4 files changed, 23 insertions, 17 deletions
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index a476f2d880..ee543e20d2 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 6dca4dcf21..f32e21c389 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ConnectionImpl.h"
+#include "Connector.h"
#include "ConnectionSettings.h"
#include "SessionImpl.h"
@@ -38,7 +39,7 @@ using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- connector(v, settings, this),
+ connector(new Connector(v, settings, this)),
version(v),
isClosed(true),//closed until successfully opened
isClosing(false)
@@ -48,9 +49,9 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
NORMAL, std::string());
- connector.setInputHandler(&handler);
- connector.setTimeoutHandler(this);
- connector.setShutdownHandler(this);
+ connector->setInputHandler(&handler);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
@@ -60,7 +61,7 @@ ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- connector.close();
+ connector->close();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -97,8 +98,8 @@ bool ConnectionImpl::isOpen() const
void ConnectionImpl::open(const std::string& host, int port)
{
QPID_LOG(info, "Connecting to " << host << ":" << port);
- connector.connect(host, port);
- connector.init();
+ connector->connect(host, port);
+ connector->init();
handler.waitForOpen();
Mutex::ScopedLock l(lock);
isClosed = false;
@@ -112,7 +113,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
AMQFrame frame(in_place<AMQHeartbeatBody>());
- connector.send(frame);
+ connector->send(frame);
}
void ConnectionImpl::close()
@@ -130,8 +131,8 @@ void ConnectionImpl::close()
template <class F> void ConnectionImpl::closeInternal(const F& f) {
isClosed = true;
- connector.close();
- for (SessionMap::iterator i=sessions.begin(); i != sessions.end(); ++i) {
+ connector->close();
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
if (s) f(s);
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index b02dda5af7..98fb212c3e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -24,7 +24,6 @@
#include "Bounds.h"
#include "ConnectionHandler.h"
-#include "Connector.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
@@ -33,11 +32,13 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
namespace qpid {
namespace client {
+class Connector;
class ConnectionSettings;
class SessionImpl;
@@ -52,7 +53,7 @@ class ConnectionImpl : public Bounds,
SessionMap sessions;
ConnectionHandler handler;
- Connector connector;
+ boost::scoped_ptr<Connector> connector;
framing::ProtocolVersion version;
sys::Mutex lock;
bool isClosed;
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index b35e77c726..c7f5be0936 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -34,13 +34,18 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/AsynchIO.h"
#include <queue>
#include <boost/weak_ptr.hpp>
#include <boost/shared_ptr.hpp>
namespace qpid {
+
+namespace sys {
+class Poller;
+class AsynchIO;
+class AsynchIOBufferBase;
+}
namespace client {
@@ -56,7 +61,7 @@ class Connector : public framing::OutputHandler,
/** Batch up frames for writing to aio. */
class Writer : public framing::FrameHandler {
- typedef sys::AsynchIO::BufferBase BufferBase;
+ typedef sys::AsynchIOBufferBase BufferBase;
typedef std::vector<framing::AMQFrame> Frames;
const uint16_t maxFrameSize;
@@ -109,7 +114,7 @@ class Connector : public framing::OutputHandler,
sys::Socket socket;
sys::AsynchIO* aio;
- sys::Poller::shared_ptr poller;
+ boost::shared_ptr<sys::Poller> poller;
void checkIdle(ssize_t status);
void setSocketTimeout();
@@ -118,7 +123,7 @@ class Connector : public framing::OutputHandler,
void handleClosed();
bool closeInternal();
- void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIO::BufferBase*);
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);