summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
commit2e05b7082f5e387fc686925e5ac006485e4686db (patch)
treeb0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/client
parent468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff)
downloadqpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp427
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp5
2 files changed, 430 insertions, 2 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
new file mode 100644
index 0000000000..c0775ab9cd
--- /dev/null
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -0,0 +1,427 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connector.h"
+
+#include "Bounds.h"
+#include "ConnectionImpl.h"
+#include "ConnectionSettings.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/rdma/RdmaIO.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/Msg.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+// This stuff needs to abstracted out of here to a platform specific file
+#include <netdb.h>
+
+namespace qpid {
+namespace client {
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+using boost::str;
+
+class RdmaConnector : public Connector, private sys::Runnable
+{
+ struct Buff;
+
+ /** Batch up frames for writing to aio. */
+ class Writer : public framing::FrameHandler {
+ typedef Rdma::Buffer BufferBase;
+ typedef std::deque<framing::AMQFrame> Frames;
+
+ const uint16_t maxFrameSize;
+ sys::Mutex lock;
+ Rdma::AsynchIO* aio;
+ BufferBase* buffer;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ framing::Buffer encode;
+ size_t framesEncoded;
+ std::string identifier;
+ Bounds* bounds;
+
+ void writeOne();
+ void newBuffer();
+
+ public:
+
+ Writer(uint16_t maxFrameSize, Bounds*);
+ ~Writer();
+ void init(std::string id, Rdma::AsynchIO*);
+ void handle(framing::AMQFrame&);
+ void write(Rdma::AsynchIO&);
+ };
+
+ const uint16_t maxFrameSize;
+ framing::ProtocolVersion version;
+ bool initiated;
+
+ sys::Mutex pollingLock;
+ bool polling;
+ bool joined;
+
+ sys::ShutdownHandler* shutdownHandler;
+ framing::InputHandler* input;
+ framing::InitiationHandler* initialiser;
+ framing::OutputHandler* output;
+
+ Writer writer;
+
+ sys::Thread receiver;
+
+ Rdma::AsynchIO* aio;
+ sys::Poller::shared_ptr poller;
+
+ ~RdmaConnector();
+
+ void run();
+ void handleClosed();
+ bool closeInternal();
+
+ // Callbacks
+ void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+ void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
+ void disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&);
+ void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
+
+ void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
+ void writebuff(Rdma::AsynchIO&);
+ void writeDataBlock(const framing::AMQDataBlock& data);
+ void eof(Rdma::AsynchIO&);
+
+ std::string identifier;
+
+ ConnectionImpl* impl;
+
+ void connect(const std::string& host, int port);
+ void close();
+ void send(framing::AMQFrame& frame);
+
+ void setInputHandler(framing::InputHandler* handler);
+ void setShutdownHandler(sys::ShutdownHandler* handler);
+ sys::ShutdownHandler* getShutdownHandler() const;
+ framing::OutputHandler* getOutputHandler();
+ const std::string& getIdentifier() const;
+
+public:
+ RdmaConnector(framing::ProtocolVersion pVersion,
+ const ConnectionSettings&,
+ ConnectionImpl*);
+};
+
+// Static constructor which registers connector here
+namespace {
+ Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
+ return new RdmaConnector(v, s, c);
+ }
+
+ struct StaticInit {
+ StaticInit() {
+ Connector::registerFactory("rdma", &create);
+ Connector::registerFactory("ib", &create);
+ };
+ } init;
+}
+
+
+RdmaConnector::RdmaConnector(ProtocolVersion ver,
+ const ConnectionSettings& settings,
+ ConnectionImpl* cimpl)
+ : maxFrameSize(settings.maxFrameSize),
+ version(ver),
+ initiated(false),
+ polling(false),
+ joined(true),
+ shutdownHandler(0),
+ writer(maxFrameSize, cimpl),
+ aio(0),
+ impl(cimpl)
+{
+ QPID_LOG(debug, "RdmaConnector created for " << version);
+}
+
+RdmaConnector::~RdmaConnector() {
+ close();
+}
+
+void RdmaConnector::connect(const std::string& host, int port){
+ Mutex::ScopedLock l(pollingLock);
+ assert(!polling);
+ assert(joined);
+ poller = Poller::shared_ptr(new Poller);
+
+ // This stuff needs to abstracted out of here to a platform specific file
+ ::addrinfo *res;
+ ::addrinfo hints;
+ hints.ai_flags = 0;
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 0;
+ int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res);
+ if (n<0) {
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ }
+
+ Rdma::Connector* c = new Rdma::Connector(
+ *res->ai_addr,
+ Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
+ boost::bind(&RdmaConnector::disconnected, this, poller, _1),
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+ c->start(poller);
+
+ polling = true;
+ joined = false;
+ receiver = Thread(this);
+}
+
+// The following only gets run when connected
+void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
+ Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+ aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
+ boost::bind(&RdmaConnector::readbuff, this, _1, _2),
+ boost::bind(&RdmaConnector::writebuff, this, _1),
+ 0, // write buffers full
+ boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+ aio->start(poller);
+
+ identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
+ writer.init(identifier, aio);
+ ProtocolInitiation init(version);
+ writeDataBlock(init);
+}
+
+void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
+ QPID_LOG(trace, "Connection Error " << identifier);
+ eof(*aio);
+}
+
+void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
+ eof(*aio);
+}
+
+void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
+ QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+ eof(*aio);
+}
+
+bool RdmaConnector::closeInternal() {
+ bool ret;
+ {
+ Mutex::ScopedLock l(pollingLock);
+ ret = polling;
+ if (polling) {
+ polling = false;
+ poller->shutdown();
+ }
+ if (joined || receiver.id() == Thread::current().id()) {
+ return ret;
+ }
+ joined = true;
+ }
+
+ receiver.join();
+ return ret;
+}
+
+void RdmaConnector::close() {
+ closeInternal();
+}
+
+void RdmaConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void RdmaConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* RdmaConnector::getOutputHandler(){
+ return this;
+}
+
+sys::ShutdownHandler* RdmaConnector::getShutdownHandler() const {
+ return shutdownHandler;
+}
+
+const std::string& RdmaConnector::getIdentifier() const {
+ return identifier;
+}
+
+void RdmaConnector::send(AMQFrame& frame) {
+ writer.handle(frame);
+}
+
+void RdmaConnector::handleClosed() {
+ if (closeInternal() && shutdownHandler)
+ shutdownHandler->shutdown();
+}
+
+RdmaConnector::Writer::Writer(uint16_t s, Bounds* b) :
+ maxFrameSize(s),
+ aio(0),
+ buffer(0),
+ lastEof(0),
+ bounds(b)
+{
+}
+
+RdmaConnector::Writer::~Writer() {
+ if (aio)
+ aio->returnBuffer(buffer);
+}
+
+void RdmaConnector::Writer::init(std::string id, Rdma::AsynchIO* a) {
+ Mutex::ScopedLock l(lock);
+ identifier = id;
+ aio = a;
+ newBuffer();
+}
+void RdmaConnector::Writer::handle(framing::AMQFrame& frame) {
+ Mutex::ScopedLock l(lock);
+ frames.push_back(frame);
+ // Don't bother to send anything unless we're at the end of a frameset (assembly in 0-10 terminology)
+ if (frame.getEof()) {
+ lastEof = frames.size();
+ QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
+ aio->notifyPendingWrite();
+ }
+ QPID_LOG(trace, "SENT " << identifier << ": " << frame);
+}
+
+void RdmaConnector::Writer::writeOne() {
+ assert(buffer);
+ QPID_LOG(trace, "Write buffer " << encode.getPosition()
+ << " bytes " << framesEncoded << " frames ");
+ framesEncoded = 0;
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encode.getPosition();
+ aio->queueWrite(buffer);
+ newBuffer();
+}
+
+void RdmaConnector::Writer::newBuffer() {
+ buffer = aio->getBuffer();
+ encode = framing::Buffer(buffer->bytes, buffer->byteCount);
+ framesEncoded = 0;
+}
+
+// Called in IO thread. (write idle routine)
+// This is NOT only called in response to previously calling notifyPendingWrite
+void RdmaConnector::Writer::write(Rdma::AsynchIO&) {
+ Mutex::ScopedLock l(lock);
+ assert(buffer);
+ // If nothing to do return immediately
+ if (lastEof==0)
+ return;
+ size_t bytesWritten = 0;
+ while (aio->writable() && !frames.empty()) {
+ const AMQFrame* frame = &frames.front();
+ uint32_t size = frame->size();
+ while (size <= encode.available()) {
+ frame->encode(encode);
+ frames.pop_front();
+ ++framesEncoded;
+ bytesWritten += size;
+ if (frames.empty())
+ break;
+ frame = &frames.front();
+ size = frame->size();
+ }
+ lastEof -= framesEncoded;
+ writeOne();
+ }
+ if (bounds) bounds->reduce(bytesWritten);
+}
+
+void RdmaConnector::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+
+ if (!initiated) {
+ framing::ProtocolInitiation protocolInit;
+ if (protocolInit.decode(in)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ }
+ initiated = true;
+ }
+ AMQFrame frame;
+ while(frame.decode(in)){
+ QPID_LOG(trace, "RECV " << identifier << ": " << frame);
+ input->received(frame);
+ }
+}
+
+void RdmaConnector::writebuff(Rdma::AsynchIO& aio_) {
+ writer.write(aio_);
+}
+
+void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
+ Rdma::Buffer* buff = aio->getBuffer();
+ framing::Buffer out(buff->bytes, buff->byteCount);
+ data.encode(out);
+ buff->dataCount = data.size();
+ aio->queueWrite(buff);
+}
+
+void RdmaConnector::eof(Rdma::AsynchIO&) {
+ handleClosed();
+}
+
+// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
+// will never be called
+void RdmaConnector::run(){
+ // Keep the connection impl in memory until run() completes.
+ //GRS: currently the ConnectionImpls destructor is where the Io thread is joined
+ //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+ //assert(protect);
+ try {
+ Dispatcher d(poller);
+
+ //aio->start(poller);
+ d.run();
+ //aio->queueForDeletion();
+ } catch (const std::exception& e) {
+ {
+ // We're no longer polling
+ Mutex::ScopedLock l(pollingLock);
+ polling = false;
+ }
+ QPID_LOG(error, e.what());
+ handleClosed();
+ }
+}
+
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 68955050b4..b736d116e1 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -137,8 +137,8 @@ void SessionImpl::suspend() //user thread
void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
- proxy.detach(id.getName());
setState(DETACHING);
+ proxy.detach(id.getName());
}
}
@@ -630,7 +630,8 @@ inline void SessionImpl::setState(State s) //call with lock held
inline void SessionImpl::waitFor(State s) //call with lock held
{
// We can be DETACHED at any time
- state.waitFor(States(s, DETACHED));
+ if (s == DETACHED) state.waitFor(DETACHED);
+ else state.waitFor(States(s, DETACHED));
check();
}