summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/RdmaIOPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/RdmaIOPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp31
1 files changed, 18 insertions, 13 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 8afe8ba5ef..f931aeb5ea 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,6 +58,7 @@ class RdmaIOHandler : public OutputControl {
// Output side
void close();
+ void abort();
void activateOutput();
void giveReadCredit(int32_t credit);
void initProtocolOut();
@@ -65,11 +66,11 @@ class RdmaIOHandler : public OutputControl {
// Input side
void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff);
void initProtocolIn(Rdma::Buffer* buff);
-
+
// Notifications
void full(Rdma::AsynchIO& aio);
void idle(Rdma::AsynchIO& aio);
- void error(Rdma::AsynchIO& aio);
+ void error(Rdma::AsynchIO& aio);
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) :
@@ -89,7 +90,7 @@ RdmaIOHandler::~RdmaIOHandler() {
if (codec)
codec->closed();
delete codec;
-
+
aio->deferDelete();
}
@@ -107,6 +108,10 @@ void RdmaIOHandler::close() {
aio->queueWriteClose();
}
+// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work
+void RdmaIOHandler::abort() {
+}
+
void RdmaIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
@@ -145,7 +150,7 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) {
QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]");
}
-// TODO: Dummy implementation of read throttling
+// TODO: Dummy implementation of read throttling
void RdmaIOHandler::giveReadCredit(int32_t) {
}
@@ -162,7 +167,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
if (codec) {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
}else{
- // Need to start protocol processing
+ // Need to start protocol processing
initProtocolIn(buff);
}
}catch(const std::exception& e){
@@ -181,13 +186,13 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")");
codec = factory->create(protocolInit.getVersion(), *this, identifier);
-
+
// If we failed to create the codec then we don't understand the offered protocol version
if (!codec) {
// send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
- aio->queueWriteClose();
+ aio->queueWriteClose();
}
}
}
@@ -217,7 +222,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory {
static class RdmaIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
// Check whether we actually have any rdma devices
if ( Rdma::deviceCount() == 0 ) {
@@ -257,7 +262,7 @@ bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const R
0, // boost::bind(&RdmaIOHandler::full, async, _1),
boost::bind(&RdmaIOHandler::error, async, _1));
async->init(aio);
-
+
// Record aio so we can get it back from a connection
ci->addContext(async);
return true;
@@ -300,7 +305,7 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F
sin.sin_addr.s_addr = INADDR_ANY;
listener.reset(
- new Rdma::Listener((const sockaddr&)(sin),
+ new Rdma::Listener((const sockaddr&)(sin),
Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1),
boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2),
@@ -323,7 +328,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio
RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
async->initProtocolOut();
}
-
+
void RdmaIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t p,