summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-10-28 22:02:53 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-10-28 22:02:53 +0000
commit54d70e027abc1e3d0450d19188ac4ca3d07f4002 (patch)
tree0253fb816e0697c0478090935752912421247651 /cpp
parentc15852b63c41477abc0e4c145a95d10212a94e43 (diff)
downloadqpid-python-54d70e027abc1e3d0450d19188ac4ca3d07f4002.tar.gz
Make federation work over Rdma links
- Some refactoring of Rdma code for simplicity git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708696 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp98
1 files changed, 53 insertions, 45 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 0a2a1db07c..4316d7b582 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -47,7 +47,6 @@ class RdmaIOHandler : public OutputControl {
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
bool readError;
- bool isClient;
void write(const framing::ProtocolInitiation&);
@@ -57,14 +56,14 @@ class RdmaIOHandler : public OutputControl {
void init(Rdma::AsynchIO* a);
void start(Poller::shared_ptr poller) {aio->start(poller);}
- void setClient() { isClient = true; }
-
// Output side
void close();
void activateOutput();
+ void initProtocolOut();
// Input side
void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
+ void initProtocolIn(Rdma::Buffer* buff);
// Notifications
void full(Rdma::AsynchIO& aio);
@@ -77,8 +76,7 @@ RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::Conn
identifier(c->getPeerName()),
factory(f),
codec(0),
- readError(false),
- isClient(false)
+ readError(false)
{
}
@@ -118,11 +116,6 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) {
if ( !(aio->writable() && aio->bufferAvailable()) ) {
return;
}
- if (isClient && codec == 0) {
- codec = factory->create(*this, identifier);
- write(framing::ProtocolInitiation(codec->getVersion()));
- return;
- }
if (codec == 0) return;
if (codec->canEncode()) {
Rdma::Buffer* buff = aio->getBuffer();
@@ -134,6 +127,15 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) {
aio->queueWriteClose();
}
+void RdmaIOHandler::initProtocolOut() {
+ // We mustn't have already started the conversation
+ // but we must be able to send
+ assert( codec == 0 );
+ assert( aio->writable() && aio->bufferAvailable() );
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
+}
+
void RdmaIOHandler::error(Rdma::AsynchIO&) {
close();
}
@@ -151,34 +153,36 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
return;
}
size_t decoded = 0;
- if (codec) { // Already initiated
- try {
+ try {
+ if (codec) {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
+ }else{
+ // Need to start protocol processing
+ initProtocolIn(buff);
}
- }else{
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- framing::ProtocolInitiation protocolInit;
- if (protocolInit.decode(in)) {
- decoded = in.getPosition();
- QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
- try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier);
- if (!codec) {
- //TODO: may still want to revise this...
- //send valid version header & close connection.
- write(framing::ProtocolInitiation(framing::highestProtocolVersion));
- readError = true;
- aio->queueWriteClose();
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- readError = true;
- aio->queueWriteClose();
- }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ readError = true;
+ aio->queueWriteClose();
+ }
+}
+
+void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ framing::ProtocolInitiation protocolInit;
+ size_t decoded = 0;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
+
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+
+ // If we failed to create the codec then we don't understand the offered protocol version
+ if (!codec) {
+ // send valid version header & close connection.
+ write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+ readError = true;
+ aio->queueWriteClose();
}
}
}
@@ -201,7 +205,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory {
void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*);
void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
void disconnected(Rdma::Connection::intrusive_ptr&);
- void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+ void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback);
};
// Static instance to initialise plugin
@@ -247,7 +251,7 @@ bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const R
ci->addContext(async);
return true;
} catch (const Rdma::Exception& e) {
- QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what());
+ QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what());
}
@@ -291,28 +295,29 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact)));
-
+
listener->start(poller);
}
// Only used for outgoing connections (in federation)
-void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
+void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback failed) {
+ failed(-1, "Connection rejected");
}
// Do the same as connection request and established but mark a client too
void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp,
ConnectionCodec::Factory* f) {
(void) request(ci, cp, f);
- RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
- async->setClient();
established(poller, ci);
+ RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
+ async->initProtocolOut();
}
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t p,
ConnectionCodec::Factory* f,
- ConnectFailedCallback)
+ ConnectFailedCallback failed)
{
::addrinfo *res;
::addrinfo hints = {};
@@ -325,13 +330,16 @@ void RdmaIOProtocolFactory::connect(
throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n)));
}
- Rdma::Connector c(
+ Rdma::Connector* c =
+ new Rdma::Connector(
*res->ai_addr,
Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f),
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1),
- boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2));
+ boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed));
+
+ c->start(poller);
}
}} // namespace qpid::sys