From 65ea2f177bd0810590895d89a490af8cea60253b Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 27 Jul 2007 17:19:30 +0000 Subject: * Asynchronous network IO subsystem - This is now implemented such that it very nearly only depends on the platform code (Socker & Poller), this is not 100% true at present, but should be simple to finish. - This is still not the default (use "./configure --disable-apr-netio" to get it) - Interrupting the broker gives a known error - Default for number of broker io threads is not correct (needs to be number of CPUs - it will run slower with too many io threads) * EventChannel code - Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-) * Rearranged the platform Socket implementations a bit for better abstraction git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 308 ++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 cpp/src/qpid/sys/AsynchIOAcceptor.cpp (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp') diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp new file mode 100644 index 0000000000..bf4a3ff842 --- /dev/null +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Acceptor.h" + +#include "Socket.h" +#include "AsynchIO.h" +#include "Mutex.h" +#include "Thread.h" + +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/log/Statement.h" + +#include +#include +#include +#include +#include + +namespace qpid { +namespace sys { + +class AsynchIOAcceptor : public Acceptor { + Poller::shared_ptr poller; + Socket listener; + int numIOThreads; + const uint16_t listeningPort; + +public: + AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace); + ~AsynchIOAcceptor() {} + void run(ConnectionInputHandlerFactory* factory); + void shutdown(); + + uint16_t getPort() const; + std::string getHost() const; + +private: + void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*); +}; + +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) +{ + return + Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace)); +} + +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) : + poller(new Poller), + numIOThreads(threads), + listeningPort(listener.listen(port, backlog)) +{} + +// Buffer definition +struct Buff : public AsynchIO::Buffer { + Buff() : + AsynchIO::Buffer(new char[65536], 65536) + {} + ~Buff() + { delete [] bytes;} +}; + +class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { + AsynchIO* aio; + ConnectionInputHandler* inputHandler; + std::queue frameQueue; + Mutex frameQueueLock; + bool frameQueueClosed; + bool initiated; + +public: + AsynchIOHandler() : + inputHandler(0), + frameQueueClosed(false), + initiated(false) + {} + + ~AsynchIOHandler() { + if (inputHandler) + inputHandler->closed(); + delete inputHandler; + } + + void init(AsynchIO* a, ConnectionInputHandler* h) { + aio = a; + inputHandler = h; + } + + // Output side + void send(framing::AMQFrame&); + void close(); + + // Input side + void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff); + void eof(AsynchIO& aio); + void disconnect(AsynchIO& aio); + + // Notifications + void nobuffs(AsynchIO& aio); + void idle(AsynchIO& aio); + void closedSocket(AsynchIO& aio, const Socket& s); +}; + +void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) { + + AsynchIOHandler* async = new AsynchIOHandler; + ConnectionInputHandler* handler = f->create(async); + AsynchIO* aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + async->init(aio, handler); + + // Give connection some buffers to use + for (int i = 0; i < 4; i++) { + aio->queueReadBuffer(new Buff); + } + aio->start(poller); +} + + +uint16_t AsynchIOAcceptor::getPort() const { + return listeningPort; // Immutable no need for lock. +} + +std::string AsynchIOAcceptor::getHost() const { + return listener.getSockname(); +} + +void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) { + Dispatcher d(poller); + AsynchAcceptor + acceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); + acceptor.start(poller); + + std::vector t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; inumIOThreads-1; ++i) { + t[i]->join(); + delete t[i]; + } +} + +void AsynchIOAcceptor::shutdown() { + poller->shutdown(); +} + +// Output side +void AsynchIOHandler::send(framing::AMQFrame& frame) { + // TODO: Need to find out if we are in the callback context, + // in the callback thread if so we can go further than just queuing the frame + // to be handled later + { + ScopedLock l(frameQueueLock); + // Ignore anything seen after closing + if (!frameQueueClosed) + frameQueue.push(frame); + } + + // Activate aio for writing here + aio->queueWrite(); +} + +void AsynchIOHandler::close() { + ScopedLock l(frameQueueLock); + frameQueueClosed = true; +} + +// Input side +void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) { + framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); + if(initiated){ + framing::AMQFrame frame; + try{ + while(frame.decode(in)) { + QPID_LOG(debug, "RECV: " << frame); + inputHandler->received(frame); + } + }catch(const std::exception& e){ + QPID_LOG(error, e.what()); + } + }else{ + framing::ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + QPID_LOG(debug, "INIT [" << aio << "]"); + inputHandler->initiated(protocolInit); + initiated = true; + } + } + // TODO: unreading needs to go away, and when we can cope + // with multiple sub-buffers in the general buffer scheme, it will + if (in.available() != 0) { + // Adjust buffer for used bytes and then "unread them" + buff->dataStart += buff->dataCount-in.available(); + buff->dataCount = in.available(); + aio->unread(buff); + } else { + // Give whole buffer back to aio subsystem + aio->queueReadBuffer(buff); + } +} + +void AsynchIOHandler::eof(AsynchIO&) { + inputHandler->closed(); + aio->queueWriteClose(); +} + +void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) { + delete &s; + aio->queueForDeletion(); + delete this; +} + +void AsynchIOHandler::disconnect(AsynchIO& a) { + // treat the same as eof + eof(a); +} + +// Notifications +void AsynchIOHandler::nobuffs(AsynchIO&) { +} + +void AsynchIOHandler::idle(AsynchIO&){ + ScopedLock l(frameQueueLock); + + if (frameQueue.empty()) { + // At this point we know that we're write idling the connection + // so we could note that somewhere or do something special + return; + } + + // Try and get a queued buffer if not then construct new one + AsynchIO::Buffer* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + std::auto_ptr out(new framing::Buffer(buff->bytes, buff->byteCount)); + int buffUsed = 0; + + while (!frameQueue.empty()) { + framing::AMQFrame frame = frameQueue.front(); + frameQueue.pop(); + + // Encode output frame + int frameSize = frame.size(); + if (frameSize > buff->byteCount) + THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + + // If we've filled current buffer then flush and get new one + if (frameSize > int(out->available())) { + buff->dataCount = buffUsed; + aio->queueWrite(buff); + + buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + out.reset(new framing::Buffer(buff->bytes, buff->byteCount)); + buffUsed = 0; + } + + frame.encode(*out); + buffUsed += frameSize; + QPID_LOG(debug, "SENT: " << frame); + } + + buff->dataCount = buffUsed; + aio->queueWrite(buff); + + if (frameQueueClosed) { + aio->queueWriteClose(); + } + +} + +}} // namespace qpid::sys -- cgit v1.2.1