/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/client/Connector.h" #include "qpid/client/Bounds.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/rdma/rdma_exception.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" #include #include #include #include // This stuff needs to abstracted out of here to a platform specific file #include namespace qpid { namespace client { using namespace qpid::sys; using namespace qpid::framing; using boost::format; using boost::str; class RdmaConnector : public Connector, public sys::Codec { typedef std::deque Frames; const uint16_t maxFrameSize; sys::Mutex lock; Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; framing::ProtocolVersion version; bool initiated; sys::Mutex dataConnectedLock; bool dataConnected; sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; framing::InitiationHandler* initialiser; framing::OutputHandler* output; Rdma::AsynchIO* aio; Rdma::Connector* acon; sys::Poller::shared_ptr poller; std::auto_ptr securityLayer; ~RdmaConnector(); // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType); void disconnected(); void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); void writebuff(Rdma::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void dataError(Rdma::AsynchIO&); void drained(); void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio); void dataStopped(Rdma::AsynchIO* aio); std::string identifier; void connect(const std::string& host, const std::string& port); void close(); void send(framing::AMQFrame& frame); void abort() {} // TODO: need to fix this for heartbeat timeouts to work void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); sys::ShutdownHandler* getShutdownHandler() const; framing::OutputHandler* getOutputHandler(); const std::string& getIdentifier() const; void activateSecurityLayer(std::auto_ptr); const qpid::sys::SecuritySettings* getSecuritySettings() { return 0; } size_t decode(const char* buffer, size_t size); size_t encode(char* buffer, size_t size); bool canEncode(); public: RdmaConnector(Poller::shared_ptr, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { return new RdmaConnector(p, v, s, c); } struct StaticInit { StaticInit() { Connector::registerFactory("rdma", &create); Connector::registerFactory("ib", &create); }; } init; } RdmaConnector::RdmaConnector(Poller::shared_ptr p, ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), version(ver), initiated(false), dataConnected(false), shutdownHandler(0), aio(0), acon(0), poller(p) { QPID_LOG(debug, "RdmaConnector created for " << version); } namespace { void deleteAsynchIO(Rdma::AsynchIO& aio) { delete &aio; } void deleteConnector(Rdma::ConnectionManager& con) { delete &con; } } RdmaConnector::~RdmaConnector() { QPID_LOG(debug, "~RdmaConnector " << identifier); if (aio) { aio->stop(deleteAsynchIO); } if (acon) { acon->stop(deleteConnector); } } void RdmaConnector::connect(const std::string& host, const std::string& port){ Mutex::ScopedLock l(dataConnectedLock); assert(!dataConnected); acon = new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this), boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); SocketAddress sa(host, port); acon->start(poller, sa); } // The following only gets run when connected void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) { try { Mutex::ScopedLock l(dataConnectedLock); assert(!dataConnected); Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); aio = new Rdma::AsynchIO(ci->getQueuePair(), cp.rdmaProtocolVersion, cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, boost::bind(&RdmaConnector::readbuff, this, _1, _2), boost::bind(&RdmaConnector::writebuff, this, _1), 0, // write buffers full boost::bind(&RdmaConnector::dataError, this, _1)); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); ProtocolInitiation init(version); writeDataBlock(init); aio->start(poller); dataConnected = true; return; } catch (const Rdma::Exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what()); } catch (const std::exception& e) { QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what()); } dataConnected = false; connectionStopped(acon, aio); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { QPID_LOG(debug, "Connection Error " << identifier); connectionStopped(acon, aio); } // Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects // so we need to check whether the data connection is started or not in here void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); if (dataConnected) { disconnected(); } else { connectionStopped(acon, aio); } } void RdmaConnector::disconnected() { QPID_LOG(debug, "Connection disconnected " << identifier); { Mutex::ScopedLock l(dataConnectedLock); // If we're closed already then we'll get to drained() anyway if (!dataConnected) return; dataConnected = false; } // Make sure that all the disconnected actions take place on the data "thread" aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::dataError(Rdma::AsynchIO&) { QPID_LOG(debug, "Data Error " << identifier); { Mutex::ScopedLock l(dataConnectedLock); // If we're closed already then we'll get to drained() anyway if (!dataConnected) return; dataConnected = false; } drained(); } void RdmaConnector::close() { QPID_LOG(debug, "RdmaConnector::close " << identifier); { Mutex::ScopedLock l(dataConnectedLock); if (!dataConnected) return; dataConnected = false; } aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::drained() { QPID_LOG(debug, "RdmaConnector::drained " << identifier); assert(!dataConnected); assert(aio); Rdma::AsynchIO* a = aio; aio = 0; a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); } void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); assert(!dataConnected); assert(acon); Rdma::Connector* c = acon; acon = 0; c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a)); } void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) { QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); assert(!dataConnected); aio = 0; acon = 0; delete a; delete c; if (shutdownHandler) { ShutdownHandler* s = shutdownHandler; shutdownHandler = 0; s->shutdown(); } } void RdmaConnector::setInputHandler(InputHandler* handler){ input = handler; } void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } OutputHandler* RdmaConnector::getOutputHandler(){ return this; } sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const { return shutdownHandler; } const std::string& RdmaConnector::getIdentifier() const { return identifier; } void RdmaConnector::send(AMQFrame& frame) { // It is possible that we are called to write after we are already shutting down Mutex::ScopedLock l(dataConnectedLock); if (!dataConnected) return; bool notifyWrite = false; { Mutex::ScopedLock l(lock); frames.push_back(frame); //only ask to write if this is the end of a frameset or if we //already have a buffers worth of data currentSize += frame.encodedSize(); if (frame.getEof()) { lastEof = frames.size(); notifyWrite = true; } else { notifyWrite = (currentSize >= maxFrameSize); } } if (notifyWrite) aio->notifyPendingWrite(); } // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { // It's possible to be disconnected and be writable Mutex::ScopedLock l(dataConnectedLock); if (!dataConnected) { return; } Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (!codec->canEncode()) { return; } Rdma::Buffer* buffer = aio->getSendBuffer(); if (buffer) { size_t encoded = codec->encode(buffer->bytes(), buffer->byteCount()); buffer->dataCount(encoded); aio->queueWrite(buffer); } } bool RdmaConnector::canEncode() { Mutex::ScopedLock l(lock); //have at least one full frameset or a whole buffers worth of data return aio->writable() && (lastEof || currentSize >= maxFrameSize); } size_t RdmaConnector::encode(char* buffer, size_t size) { framing::Buffer out(buffer, size); size_t bytesWritten(0); { Mutex::ScopedLock l(lock); while (!frames.empty() && out.available() >= frames.front().encodedSize() ) { frames.front().encode(out); QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front()); frames.pop_front(); if (lastEof) --lastEof; } bytesWritten = size - out.available(); currentSize -= bytesWritten; } if (bounds) bounds->reduce(bytesWritten); return bytesWritten; } void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; codec->decode(buff->bytes(), buff->dataCount()); } size_t RdmaConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast(buffer), size); if (!initiated) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { //TODO: check the version is correct QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); } initiated = true; } AMQFrame frame; while(frame.decode(in)){ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); input->received(frame); } return size - in.available(); } void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { Rdma::Buffer* buff = aio->getSendBuffer(); framing::Buffer out(buff->bytes(), buff->byteCount()); data.encode(out); buff->dataCount(data.encodedSize()); aio->queueWrite(buff); } void RdmaConnector::activateSecurityLayer(std::auto_ptr sl) { securityLayer = sl; securityLayer->init(this); } }} // namespace qpid::client