From 4274737d8315848edc4fa0ccb534482202ce5658 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Mon, 8 Jun 2009 14:34:45 +0000 Subject: Plumbed in an a connection abort operation to the OutputHandler git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@782649 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/AggregateOutput.cpp | 14 ++++++------ cpp/src/qpid/sys/AggregateOutput.h | 7 +++--- cpp/src/qpid/sys/AsynchIOHandler.cpp | 16 +++++++++----- cpp/src/qpid/sys/AsynchIOHandler.h | 8 +++---- cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 5 +++-- cpp/src/qpid/sys/OutputControl.h | 7 +++--- cpp/src/qpid/sys/RdmaIOPlugin.cpp | 31 ++++++++++++++++----------- cpp/src/qpid/sys/ssl/SslHandler.cpp | 14 ++++++++---- cpp/src/qpid/sys/ssl/SslHandler.h | 8 +++---- 9 files changed, 66 insertions(+), 44 deletions(-) (limited to 'cpp/src/qpid/sys') diff --git a/cpp/src/qpid/sys/AggregateOutput.cpp b/cpp/src/qpid/sys/AggregateOutput.cpp index fa6901d3e4..74bf6d0f85 100644 --- a/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/cpp/src/qpid/sys/AggregateOutput.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,13 +25,15 @@ namespace qpid { namespace sys { - + +void AggregateOutput::abort() { control.abort(); } + void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } bool AggregateOutput::hasOutput() { - for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) + for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i) if ((*i)->hasOutput()) return true; return false; } @@ -41,7 +43,7 @@ bool AggregateOutput::doOutput() bool result = false; if (!tasks.empty()) { if (next >= tasks.size()) next = next % tasks.size(); - + size_t start = next; //loop until a task generated some output while (!result) { @@ -58,7 +60,7 @@ void AggregateOutput::addOutputTask(OutputTask* t) { tasks.push_back(t); } - + void AggregateOutput::removeOutputTask(OutputTask* t) { TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t); diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h index fcd0d4c2f7..b33113796c 100644 --- a/cpp/src/qpid/sys/AggregateOutput.h +++ b/cpp/src/qpid/sys/AggregateOutput.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,9 +43,10 @@ namespace sys { public: AggregateOutput(OutputControl& c) : next(0), control(c) {}; //this may be called on any thread + QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); QPID_COMMON_EXTERN void giveReadCredit(int32_t); - + //all the following will be called on the same thread QPID_COMMON_EXTERN bool doOutput(); QPID_COMMON_EXTERN bool hasOutput(); diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 6b7e7b5145..9da4a68381 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,8 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" +#include + namespace qpid { namespace sys { @@ -75,6 +77,10 @@ void AsynchIOHandler::write(const framing::ProtocolInitiation& data) aio->queueWrite(buff); } +void AsynchIOHandler::abort() { + aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1)); +} + void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } @@ -120,7 +126,7 @@ bool AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { //send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->queueWriteClose(); + aio->queueWriteClose(); } } catch (const std::exception& e) { QPID_LOG(error, e.what()); @@ -163,7 +169,7 @@ void AsynchIOHandler::eof(AsynchIO&) { } void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { - // If we closed with data still to send log a warning + // If we closed with data still to send log a warning if (!aio->writeQueueEmpty()) { QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); } @@ -198,7 +204,7 @@ void AsynchIOHandler::idle(AsynchIO&){ aio->queueWrite(buff); } if (codec->isClosed()) - aio->queueWriteClose(); + aio->queueWriteClose(); } catch (const std::exception& e) { QPID_LOG(error, e.what()); aio->queueWriteClose(); diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 9f1d043b62..059b22a935 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -9,9 +9,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -60,7 +60,7 @@ class AsynchIOHandler : public OutputControl { QPID_COMMON_EXTERN void setClient() { isClient = true; } // Output side - QPID_COMMON_EXTERN void close(); + QPID_COMMON_EXTERN void abort(); QPID_COMMON_EXTERN void activateOutput(); QPID_COMMON_EXTERN void giveReadCredit(int32_t credit); @@ -68,7 +68,7 @@ class AsynchIOHandler : public OutputControl { QPID_COMMON_EXTERN bool readbuff(AsynchIO& aio, AsynchIOBufferBase* buff); QPID_COMMON_EXTERN void eof(AsynchIO& aio); QPID_COMMON_EXTERN void disconnect(AsynchIO& aio); - + // Notifications QPID_COMMON_EXTERN void nobuffs(AsynchIO& aio); QPID_COMMON_EXTERN void idle(AsynchIO& aio); diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index 32809d86a1..909631a8c4 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,6 +43,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler void close() { next->close(); } size_t getBuffered() const { return next->getBuffered(); } + void abort() { next->abort(); } void activateOutput() { next->activateOutput(); } void giveReadCredit(int32_t credit) { next->giveReadCredit(credit); } void send(framing::AMQFrame& f) { next->send(f); } diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h index e9e6c57a9b..d813428b67 100644 --- a/cpp/src/qpid/sys/OutputControl.h +++ b/cpp/src/qpid/sys/OutputControl.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,10 +27,11 @@ namespace qpid { namespace sys { - class OutputControl + class OutputControl { public: virtual ~OutputControl() {} + virtual void abort() = 0; virtual void activateOutput() = 0; virtual void giveReadCredit(int32_t credit) = 0; }; diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 8afe8ba5ef..f931aeb5ea 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,6 +58,7 @@ class RdmaIOHandler : public OutputControl { // Output side void close(); + void abort(); void activateOutput(); void giveReadCredit(int32_t credit); void initProtocolOut(); @@ -65,11 +66,11 @@ class RdmaIOHandler : public OutputControl { // Input side void readbuff(Rdma::AsynchIO& aio, Rdma::Buffer* buff); void initProtocolIn(Rdma::Buffer* buff); - + // Notifications void full(Rdma::AsynchIO& aio); void idle(Rdma::AsynchIO& aio); - void error(Rdma::AsynchIO& aio); + void error(Rdma::AsynchIO& aio); }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr& c, qpid::sys::ConnectionCodec::Factory* f) : @@ -89,7 +90,7 @@ RdmaIOHandler::~RdmaIOHandler() { if (codec) codec->closed(); delete codec; - + aio->deferDelete(); } @@ -107,6 +108,10 @@ void RdmaIOHandler::close() { aio->queueWriteClose(); } +// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work +void RdmaIOHandler::abort() { +} + void RdmaIOHandler::activateOutput() { aio->notifyPendingWrite(); } @@ -145,7 +150,7 @@ void RdmaIOHandler::full(Rdma::AsynchIO&) { QPID_LOG(debug, "Rdma: buffer full [" << identifier << "]"); } -// TODO: Dummy implementation of read throttling +// TODO: Dummy implementation of read throttling void RdmaIOHandler::giveReadCredit(int32_t) { } @@ -162,7 +167,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { if (codec) { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); }else{ - // Need to start protocol processing + // Need to start protocol processing initProtocolIn(buff); } }catch(const std::exception& e){ @@ -181,13 +186,13 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { QPID_LOG(debug, "Rdma: RECV [" << identifier << "] INIT(" << protocolInit << ")"); codec = factory->create(protocolInit.getVersion(), *this, identifier); - + // If we failed to create the codec then we don't understand the offered protocol version if (!codec) { // send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->queueWriteClose(); + aio->queueWriteClose(); } } } @@ -217,7 +222,7 @@ class RdmaIOProtocolFactory : public ProtocolFactory { static class RdmaIOPlugin : public Plugin { void earlyInitialize(Target&) { } - + void initialize(Target& target) { // Check whether we actually have any rdma devices if ( Rdma::deviceCount() == 0 ) { @@ -257,7 +262,7 @@ bool RdmaIOProtocolFactory::request(Rdma::Connection::intrusive_ptr& ci, const R 0, // boost::bind(&RdmaIOHandler::full, async, _1), boost::bind(&RdmaIOHandler::error, async, _1)); async->init(aio); - + // Record aio so we can get it back from a connection ci->addContext(async); return true; @@ -300,7 +305,7 @@ void RdmaIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::F sin.sin_addr.s_addr = INADDR_ANY; listener.reset( - new Rdma::Listener((const sockaddr&)(sin), + new Rdma::Listener((const sockaddr&)(sin), Rdma::ConnectionParams(65536, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaIOProtocolFactory::established, this, poller, _1), boost::bind(&RdmaIOProtocolFactory::connectionError, this, _1, _2), @@ -323,7 +328,7 @@ void RdmaIOProtocolFactory::connected(Poller::shared_ptr poller, Rdma::Connectio RdmaIOHandler* async = ci->getContext(); async->initProtocolOut(); } - + void RdmaIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t p, diff --git a/cpp/src/qpid/sys/ssl/SslHandler.cpp b/cpp/src/qpid/sys/ssl/SslHandler.cpp index 3c7e2190e7..282c3ce8c8 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.cpp +++ b/cpp/src/qpid/sys/ssl/SslHandler.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,8 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" +#include + namespace qpid { namespace sys { namespace ssl { @@ -76,6 +78,10 @@ void SslHandler::write(const framing::ProtocolInitiation& data) aio->queueWrite(buff); } +void SslHandler::abort() { + // TODO: can't implement currently as underlying functionality not implemented + // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1)); +} void SslHandler::activateOutput() { aio->notifyPendingWrite(); } @@ -111,7 +117,7 @@ void SslHandler::readbuff(SslIO& , SslIO::BufferBase* buff) { //send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->queueWriteClose(); + aio->queueWriteClose(); } } catch (const std::exception& e) { QPID_LOG(error, e.what()); @@ -140,7 +146,7 @@ void SslHandler::eof(SslIO&) { } void SslHandler::closedSocket(SslIO&, const SslSocket& s) { - // If we closed with data still to send log a warning + // If we closed with data still to send log a warning if (!aio->writeQueueEmpty()) { QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)"); } diff --git a/cpp/src/qpid/sys/ssl/SslHandler.h b/cpp/src/qpid/sys/ssl/SslHandler.h index ae654d7ad2..8f6b8e732a 100644 --- a/cpp/src/qpid/sys/ssl/SslHandler.h +++ b/cpp/src/qpid/sys/ssl/SslHandler.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -56,7 +56,7 @@ class SslHandler : public OutputControl { void setClient() { isClient = true; } // Output side - void close(); + void abort(); void activateOutput(); void giveReadCredit(int32_t); @@ -64,7 +64,7 @@ class SslHandler : public OutputControl { void readbuff(SslIO& aio, SslIOBufferBase* buff); void eof(SslIO& aio); void disconnect(SslIO& aio); - + // Notifications void nobuffs(SslIO& aio); void idle(SslIO& aio); -- cgit v1.2.1