diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-06-08 14:34:45 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-06-08 14:34:45 +0000 |
| commit | 4274737d8315848edc4fa0ccb534482202ce5658 (patch) | |
| tree | ef8e34f05278aa13c2e41425156a3d5a6a08ffc2 /cpp/src/qpid/sys/RdmaIOPlugin.cpp | |
| parent | a811f1b32a6d3e1b9a39c36829ac6847ed6d8e42 (diff) | |
| download | qpid-python-4274737d8315848edc4fa0ccb534482202ce5658.tar.gz | |
Plumbed in an a connection abort operation to the OutputHandler
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782649 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/RdmaIOPlugin.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 8afe8ba5ef..f931aeb5ea 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,6 +58,7 @@ class RdmaIOHandler : public OutputControl { // Output side void close(); + void abort(); void activateOutput(); void giveReadCredit(int32_t credit); void initProtocolOut(); @@ -65,11 +66,11 @@ class RdmaIOHandler : public OutputControl { // Input side void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); void initProtocolIn(Rdma::Buffer* buff); - + // Notifications void full(Rdma::AsynchIO& aio); void idle(Rdma::AsynchIO& aio); - void error(Rdma::AsynchIO& aio); + void error(Rdma::AsynchIO& aio); }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) : @@ -89,7 +90,7 @@ RdmaIOHandler::~RdmaIOHandler() { if (codec) codec->closed(); delete codec; - + aio->deferDelete(); } @@ -107,6 +108,10 @@ void RdmaIOHandler::close() { aio->queueWriteClose(); } +// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work +void RdmaIOHandler::abort() { +} + void RdmaIOHandler::activateOutput() { aio->notifyPendingWrite(); } @@ -145,7 +150,7 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } -// TODO: Dummy implementation of read throttling +// TODO: Dummy implementation of read throttling void RdmaIOHandler::giveReadCredit(int32_t) { } @@ -162,7 +167,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (codec) { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); }else{ - // Need to start protocol processing + // Need to start protocol processing initProtocolIn(buff); } }catch(const std::exception& e){ @@ -181,13 +186,13 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier); - + // If we failed to create the codec then we don't understand the offered protocol version if (!codec) { // send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->queueWriteClose(); + aio->queueWriteClose(); } } } @@ -217,7 +222,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory { static class RdmaIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { // Check whether we actually have any rdma devices if ( Rdma::deviceCount() == 0 ) { @@ -257,7 +262,7 @@ bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const R 0, // boost::bind(&RdmaIOHandler::full, async, _1), boost::bind(&RdmaIOHandler::error, async, _1)); async->init(aio); - + // Record aio so we can get it back from a connection ci->addContext(async); return true; @@ -300,7 +305,7 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F sin.sin_addr.s_addr = INADDR_ANY; listener.reset( - new Rdma::Listener((const sockaddr&)(sin), + new Rdma::Listener((const sockaddr&)(sin), Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), @@ -323,7 +328,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); async->initProtocolOut(); } - + void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t p, |
