diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-10-28 22:02:53 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-10-28 22:02:53 +0000 |
| commit | 54d70e027abc1e3d0450d19188ac4ca3d07f4002 (patch) | |
| tree | 0253fb816e0697c0478090935752912421247651 /cpp | |
| parent | c15852b63c41477abc0e4c145a95d10212a94e43 (diff) | |
| download | qpid-python-54d70e027abc1e3d0450d19188ac4ca3d07f4002.tar.gz | |
Make federation work over Rdma links
- Some refactoring of Rdma code for simplicity
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@708696 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 98 |
1 files changed, 53 insertions, 45 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 0a2a1db07c..4316d7b582 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -47,7 +47,6 @@ class RdmaIOHandler : public OutputControl { ConnectionCodec::Factory* factory; ConnectionCodec* codec; bool readError; - bool isClient; void write(const framing::ProtocolInitiation&); @@ -57,14 +56,14 @@ class RdmaIOHandler : public OutputControl { void init(Rdma::AsynchIO* a); void start(Poller::shared_ptr poller) {aio->start(poller);} - void setClient() { isClient = true; } - // Output side void close(); void activateOutput(); + void initProtocolOut(); // Input side void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); + void initProtocolIn(Rdma::Buffer* buff); // Notifications void full(Rdma::AsynchIO& aio); @@ -77,8 +76,7 @@ RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::Conn identifier(c->getPeerName()), factory(f), codec(0), - readError(false), - isClient(false) + readError(false) { } @@ -118,11 +116,6 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) { if ( !(aio->writable() && aio->bufferAvailable()) ) { return; } - if (isClient && codec == 0) { - codec = factory->create(*this, identifier); - write(framing::ProtocolInitiation(codec->getVersion())); - return; - } if (codec == 0) return; if (codec->canEncode()) { Rdma::Buffer* buff = aio->getBuffer(); @@ -134,6 +127,15 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) { aio->queueWriteClose(); } +void RdmaIOHandler::initProtocolOut() { + // We mustn't have already started the conversation + // but we must be able to send + assert( codec == 0 ); + assert( aio->writable() && aio->bufferAvailable() ); + codec = factory->create(*this, identifier); + write(framing::ProtocolInitiation(codec->getVersion())); +} + void RdmaIOHandler::error(Rdma::AsynchIO&) { close(); } @@ -151,34 +153,36 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { return; } size_t decoded = 0; - if (codec) { // Already initiated - try { + try { + if (codec) { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); + }else{ + // Need to start protocol processing + initProtocolIn(buff); } - }else{ - framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - framing::ProtocolInitiation protocolInit; - if (protocolInit.decode(in)) { - decoded = in.getPosition(); - QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); - try { - codec = factory->create(protocolInit.getVersion(), *this, identifier); - if (!codec) { - //TODO: may still want to revise this... - //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); - readError = true; - aio->queueWriteClose(); - } - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - readError = true; - aio->queueWriteClose(); - } + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + readError = true; + aio->queueWriteClose(); + } +} + +void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + framing::ProtocolInitiation protocolInit; + size_t decoded = 0; + if (protocolInit.decode(in)) { + decoded = in.getPosition(); + QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); + + codec = factory->create(protocolInit.getVersion(), *this, identifier); + + // If we failed to create the codec then we don't understand the offered protocol version + if (!codec) { + // send valid version header & close connection. + write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + readError = true; + aio->queueWriteClose(); } } } @@ -201,7 +205,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory { void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); void connectionError(Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); void disconnected(Rdma::Connection::intrusive_ptr&); - void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); + void rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback); }; // Static instance to initialise plugin @@ -247,7 +251,7 @@ bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const R ci->addContext(async); return true; } catch (const Rdma::Exception& e) { - QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma excepion): " << e.what()); + QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what()); } catch (const std::exception& e) { QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what()); } @@ -291,28 +295,29 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact))); - + listener->start(poller); } // Only used for outgoing connections (in federation) -void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { +void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&, ConnectFailedCallback failed) { + failed(-1, "Connection rejected"); } // Do the same as connection request and established but mark a client too void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp, ConnectionCodec::Factory* f) { (void) request(ci, cp, f); - RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); - async->setClient(); established(poller, ci); + RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); + async->initProtocolOut(); } void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t p, ConnectionCodec::Factory* f, - ConnectFailedCallback) + ConnectFailedCallback failed) { ::addrinfo *res; ::addrinfo hints = {}; @@ -325,13 +330,16 @@ void RdmaIOProtocolFactory::connect( throw Exception(QPID_MSG("Rdma: Cannot resolve " << host << ": " << ::gai_strerror(n))); } - Rdma::Connector c( + Rdma::Connector* c = + new Rdma::Connector( *res->ai_addr, Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), - boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2)); + boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); + + c->start(poller); } }} // namespace qpid::sys |
