/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/sys/ProtocolFactory.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" #include "qpid/broker/NameGenerator.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/log/Statement.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/rdma/rdma_exception.h" #include "qpid/sys/OutputControl.h" #include "qpid/sys/SecuritySettings.h" #include #include #include using std::auto_ptr; using std::string; using std::stringstream; namespace qpid { namespace sys { class RdmaIOHandler : public OutputControl { std::string identifier; ConnectionCodec::Factory* factory; ConnectionCodec* codec; bool readError; sys::Mutex pollingLock; bool polling; Rdma::AsynchIO* aio; Rdma::Connection::intrusive_ptr connection; void write(const framing::ProtocolInitiation&); void disconnectAction(); public: RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f); ~RdmaIOHandler(); void init(Rdma::AsynchIO* a); void start(Poller::shared_ptr poller); // Output side void close(); void abort(); void activateOutput(); void initProtocolOut(); // Input side void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); void initProtocolIn(Rdma::Buffer* buff); // Notifications void full(Rdma::AsynchIO& aio); void idle(Rdma::AsynchIO& aio); void error(Rdma::AsynchIO& aio); void disconnected(); void drained(); }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) : identifier(broker::QPID_NAME_PREFIX+c->getFullName()), factory(f), codec(0), readError(false), polling(false), connection(c) { } RdmaIOHandler::~RdmaIOHandler() { if (codec) codec->closed(); delete codec; delete aio; } void RdmaIOHandler::init(Rdma::AsynchIO* a) { aio = a; } void RdmaIOHandler::start(Poller::shared_ptr poller) { Mutex::ScopedLock l(pollingLock); assert(!polling); polling = true; aio->start(poller); } void RdmaIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "Rdma: SENT [" << identifier << "]: INIT(" << data << ")"); Rdma::Buffer* buff = aio->getSendBuffer(); assert(buff); framing::Buffer out(buff->bytes(), buff->byteCount()); data.encode(out); buff->dataCount(data.encodedSize()); aio->queueWrite(buff); } void RdmaIOHandler::close() { aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this)); } // TODO: Dummy implementation, need to fill this in for heartbeat timeout to work void RdmaIOHandler::abort() { } void RdmaIOHandler::activateOutput() { aio->notifyPendingWrite(); } void RdmaIOHandler::idle(Rdma::AsynchIO&) { // TODO: Shouldn't need this test as idle() should only ever be called when // the connection is writable anyway if ( !aio->writable() ) { return; } if (codec == 0) return; if (!codec->canEncode()) { return; } Rdma::Buffer* buff = aio->getSendBuffer(); if (buff) { size_t encoded=codec->encode(buff->bytes(), buff->byteCount()); buff->dataCount(encoded); aio->queueWrite(buff); if (codec->isClosed()) { close(); } } } void RdmaIOHandler::initProtocolOut() { // We mustn't have already started the conversation // but we must be able to send assert( codec == 0 ); assert( aio->writable() ); codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); } void RdmaIOHandler::error(Rdma::AsynchIO&) { disconnected(); } namespace { void stopped(RdmaIOHandler* async) { delete async; } } void RdmaIOHandler::disconnectAction() { { Mutex::ScopedLock l(pollingLock); // If we're closed already then we'll get to drained() anyway if (!polling) return; polling = false; } aio->stop(boost::bind(&stopped, this)); } void RdmaIOHandler::disconnected() { aio->requestCallback(boost::bind(&RdmaIOHandler::disconnectAction, this)); } void RdmaIOHandler::drained() { // We know we've drained the write queue now, but we don't have to do anything // because we can rely on the client to disconnect to trigger the connection // cleanup. } void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } // The logic here is subtly different from TCP as RDMA is message oriented // so we define that an RDMA message is a frame - in this case there is no putting back // of any message remainder - there shouldn't be any. And what we read here can't be // smaller than a frame void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (readError) { return; } try { if (codec) { (void) codec->decode(buff->bytes(), buff->dataCount()); }else{ // Need to start protocol processing initProtocolIn(buff); } }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; close(); } } void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { framing::Buffer in(buff->bytes(), buff->dataCount()); framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { QPID_LOG(debug, "Rdma: RECV [" << identifier << "]: INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); // If we failed to create the codec then we don't understand the offered protocol version if (!codec) { // send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; close(); } } } class RdmaIOProtocolFactory : public ProtocolFactory { auto_ptr listener; const uint16_t listeningPort; public: RdmaIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& name, const string& host, const std::string& port, ConnectionCodec::Factory*, ConnectFailedCallback); uint16_t getPort() const; private: bool request(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); void established(Poller::shared_ptr, Rdma::Connection::intrusive_ptr); void connected(Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectionCodec::Factory*); void connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType); void disconnected(Rdma::Connection::intrusive_ptr); void rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback); }; // Static instance to initialise plugin static class RdmaIOPlugin : public Plugin { void earlyInitialize(Target&) { } void initialize(Target& target) { // Check whether we actually have any rdma devices if ( Rdma::deviceCount() == 0 ) { QPID_LOG(info, "Rdma: Disabled: no rdma devices found"); return; } broker::Broker* broker = dynamic_cast(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(new RdmaIOProtocolFactory(opts.port, opts.connectionBacklog)); QPID_LOG(notice, "Rdma: Listening on RDMA port " << protocol->getPort()); broker->registerProtocolFactory("rdma", protocol); } } } rdmaPlugin; RdmaIOProtocolFactory::RdmaIOProtocolFactory(int16_t port, int /*backlog*/) : listeningPort(port) {} void RdmaIOProtocolFactory::established(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci) { RdmaIOHandler* async = ci->getContext(); async->start(poller); } bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, ConnectionCodec::Factory* f) { try { if (cp.rdmaProtocolVersion == 0) { QPID_LOG(warning, "Rdma: connection from protocol version 0 client"); } RdmaIOHandler* async = new RdmaIOHandler(ci, f); Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), cp.rdmaProtocolVersion, cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, boost::bind(&RdmaIOHandler::readbuff, async, _1, _2), boost::bind(&RdmaIOHandler::idle, async, _1), 0, // boost::bind(&RdmaIOHandler::full, async, _1), boost::bind(&RdmaIOHandler::error, async, _1)); async->init(aio); // Record aio so we can get it back from a connection ci->addContext(async); return true; } catch (const Rdma::Exception& e) { QPID_LOG(error, "Rdma: Cannot accept new connection (Rdma exception): " << e.what()); } catch (const std::exception& e) { QPID_LOG(error, "Rdma: Cannot accept new connection (unknown exception): " << e.what()); } // If we get here we caught an exception so reject connection return false; } void RdmaIOProtocolFactory::connectionError(Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { } void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) { // If we've got a connection already tear it down, otherwise ignore RdmaIOHandler* async = ci->getContext(); if (async) { // Make sure we don't disconnect more than once ci->removeContext(); async->disconnected(); } } uint16_t RdmaIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { listener.reset( new Rdma::Listener( Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::request, this, _1, _2, fact))); SocketAddress sa("",boost::lexical_cast(listeningPort)); listener->start(poller, sa); } // Only used for outgoing connections (in federation) void RdmaIOProtocolFactory::rejected(Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&, ConnectFailedCallback failed) { failed(-1, "Connection rejected"); } // Do the same as connection request and established but mark a client too void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp, ConnectionCodec::Factory* f) { (void) request(ci, cp, f); established(poller, ci); RdmaIOHandler* async = ci->getContext(); async->initProtocolOut(); } void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& /*name*/, const std::string& host, const std::string& port, ConnectionCodec::Factory* f, ConnectFailedCallback failed) { Rdma::Connector* c = new Rdma::Connector( Rdma::ConnectionParams(8000, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::connected, this, poller, _1, _2, f), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), boost::bind(&RdmaIOProtocolFactory::disconnected, this, _1), boost::bind(&RdmaIOProtocolFactory::rejected, this, _1, _2, failed)); SocketAddress sa(host, port); c->start(poller, sa); } }} // namespace qpid::sys