summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/Connector.cpp')
-rw-r--r--cpp/src/qpid/client/Connector.cpp96
1 files changed, 36 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 1558f292aa..946bf0138d 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -27,9 +27,11 @@
#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/Socket.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
@@ -51,21 +53,23 @@ using boost::str;
// Stuff for the registry of protocol connectors (maybe should be moved to its own file)
namespace {
typedef std::map<std::string, Connector::Factory*> ProtocolRegistry;
-
+
ProtocolRegistry& theProtocolRegistry() {
static ProtocolRegistry protocolRegistry;
-
+
return protocolRegistry;
}
}
-Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
+Connector* Connector::create(const std::string& proto,
+ Poller::shared_ptr p,
+ framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c)
{
ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto);
if (i==theProtocolRegistry().end()) {
throw Exception(QPID_MSG("Unknown protocol: " << proto));
}
- return (i->second)(v, s, c);
+ return (i->second)(p, v, s, c);
}
void Connector::registerFactory(const std::string& proto, Factory* connectorFactory)
@@ -81,7 +85,7 @@ void Connector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>)
{
}
-class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
+class TCPConnector : public Connector, public sys::Codec
{
typedef std::deque<framing::AMQFrame> Frames;
struct Buff;
@@ -93,7 +97,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
size_t lastEof; // Position after last EOF in frames
uint64_t currentSize;
Bounds* bounds;
-
+
framing::ProtocolVersion version;
bool initiated;
bool closed;
@@ -104,28 +108,25 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
framing::InitiationHandler* initialiser;
framing::OutputHandler* output;
- sys::Thread receiver;
-
sys::Socket socket;
sys::AsynchIO* aio;
std::string identifier;
- boost::shared_ptr<sys::Poller> poller;
+ Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~TCPConnector();
- void run();
void handleClosed();
bool closeInternal();
-
+
bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
boost::weak_ptr<ConnectionImpl> impl;
-
+
void connect(const std::string& host, int port);
void init();
void close();
@@ -142,18 +143,23 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool canEncode();
-
public:
- TCPConnector(framing::ProtocolVersion pVersion,
- const ConnectionSettings&,
+ TCPConnector(Poller::shared_ptr,
+ framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
ConnectionImpl*);
};
+struct TCPConnector::Buff : public AsynchIO::BufferBase {
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
+ ~Buff() { delete [] bytes;}
+};
+
// Static constructor which registers connector here
namespace {
- Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
- return new TCPConnector(v, s, c);
+ Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new TCPConnector(p, v, s, c);
}
struct StaticInit {
@@ -163,19 +169,21 @@ namespace {
} init;
}
-TCPConnector::TCPConnector(ProtocolVersion ver,
+TCPConnector::TCPConnector(Poller::shared_ptr p,
+ ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
lastEof(0),
currentSize(0),
bounds(cimpl),
- version(ver),
+ version(ver),
initiated(false),
closed(true),
joined(true),
shutdownHandler(0),
aio(0),
+ poller(p),
impl(cimpl->shared_from_this())
{
QPID_LOG(debug, "TCPConnector created for " << version.toString());
@@ -197,7 +205,6 @@ void TCPConnector::connect(const std::string& host, int port){
}
identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
- poller = Poller::shared_ptr(new Poller);
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
@@ -214,28 +221,24 @@ void TCPConnector::init(){
ProtocolInitiation init(version);
writeDataBlock(init);
joined = false;
- receiver = Thread(this);
+ for (int i = 0; i < 32; i++) {
+ aio->queueReadBuffer(new Buff(maxFrameSize));
+ }
+
+ aio->start(poller);
}
bool TCPConnector::closeInternal() {
- bool ret;
- {
Mutex::ScopedLock l(lock);
- ret = !closed;
+ bool ret = !closed;
if (!closed) {
closed = true;
aio->queueForDeletion();
- poller->shutdown();
- }
- if (joined || receiver.id() == Thread::current().id()) {
- return ret;
- }
- joined = true;
+ socket.close();
}
- receiver.join();
return ret;
}
-
+
void TCPConnector::close() {
closeInternal();
}
@@ -285,18 +288,13 @@ void TCPConnector::handleClosed() {
shutdownHandler->shutdown();
}
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
if (codec->canEncode()) {
std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
-
+
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
@@ -382,28 +380,6 @@ void TCPConnector::eof(AsynchIO&) {
handleClosed();
}
-void TCPConnector::run() {
- // Keep the connection impl in memory until run() completes.
- boost::shared_ptr<ConnectionImpl> protect = impl.lock();
- assert(protect);
- try {
- Dispatcher d(poller);
-
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
-
- aio->start(poller);
- d.run();
- } catch (const std::exception& e) {
- QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what()));
- handleClosed();
- }
- try {
- socket.close();
- } catch (const std::exception&) {}
-}
-
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;