summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
commit65ea2f177bd0810590895d89a490af8cea60253b (patch)
tree1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06 /cpp/src/qpid/sys/AsynchIOAcceptor.cpp
parent0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff)
downloadqpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform code (Socker & Poller), this is not 100% true at present, but should be simple to finish. - This is still not the default (use "./configure --disable-apr-netio" to get it) - Interrupting the broker gives a known error - Default for number of broker io threads is not correct (needs to be number of CPUs - it will run slower with too many io threads) * EventChannel code - Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-) * Rearranged the platform Socket implementations a bit for better abstraction git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOAcceptor.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp308
1 files changed, 308 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
new file mode 100644
index 0000000000..bf4a3ff842
--- /dev/null
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+
+#include "Socket.h"
+#include "AsynchIO.h"
+#include "Mutex.h"
+#include "Thread.h"
+
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+#include <queue>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+ Poller::shared_ptr poller;
+ Socket listener;
+ int numIOThreads;
+ const uint16_t listeningPort;
+
+public:
+ AsynchIOAcceptor(int16_t port, int backlog, int threads, bool trace);
+ ~AsynchIOAcceptor() {}
+ void run(ConnectionInputHandlerFactory* factory);
+ void shutdown();
+
+ uint16_t getPort() const;
+ std::string getHost() const;
+
+private:
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+};
+
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
+{
+ return
+ Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads, trace));
+}
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads, bool) :
+ poller(new Poller),
+ numIOThreads(threads),
+ listeningPort(listener.listen(port, backlog))
+{}
+
+// Buffer definition
+struct Buff : public AsynchIO::Buffer {
+ Buff() :
+ AsynchIO::Buffer(new char[65536], 65536)
+ {}
+ ~Buff()
+ { delete [] bytes;}
+};
+
+class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+ AsynchIO* aio;
+ ConnectionInputHandler* inputHandler;
+ std::queue<framing::AMQFrame> frameQueue;
+ Mutex frameQueueLock;
+ bool frameQueueClosed;
+ bool initiated;
+
+public:
+ AsynchIOHandler() :
+ inputHandler(0),
+ frameQueueClosed(false),
+ initiated(false)
+ {}
+
+ ~AsynchIOHandler() {
+ if (inputHandler)
+ inputHandler->closed();
+ delete inputHandler;
+ }
+
+ void init(AsynchIO* a, ConnectionInputHandler* h) {
+ aio = a;
+ inputHandler = h;
+ }
+
+ // Output side
+ void send(framing::AMQFrame&);
+ void close();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::Buffer* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
+
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
+
+ AsynchIOHandler* async = new AsynchIOHandler;
+ ConnectionInputHandler* handler = f->create(async);
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, handler);
+
+ // Give connection some buffers to use
+ for (int i = 0; i < 4; i++) {
+ aio->queueReadBuffer(new Buff);
+ }
+ aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+ return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+ Dispatcher d(poller);
+ AsynchAcceptor
+ acceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
+ acceptor.start(poller);
+
+ std::vector<Thread*> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = new Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i>numIOThreads-1; ++i) {
+ t[i]->join();
+ delete t[i];
+ }
+}
+
+void AsynchIOAcceptor::shutdown() {
+ poller->shutdown();
+}
+
+// Output side
+void AsynchIOHandler::send(framing::AMQFrame& frame) {
+ // TODO: Need to find out if we are in the callback context,
+ // in the callback thread if so we can go further than just queuing the frame
+ // to be handled later
+ {
+ ScopedLock<Mutex> l(frameQueueLock);
+ // Ignore anything seen after closing
+ if (!frameQueueClosed)
+ frameQueue.push(frame);
+ }
+
+ // Activate aio for writing here
+ aio->queueWrite();
+}
+
+void AsynchIOHandler::close() {
+ ScopedLock<Mutex> l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+// Input side
+void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::Buffer* buff) {
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+ if(initiated){
+ framing::AMQFrame frame;
+ try{
+ while(frame.decode(in)) {
+ QPID_LOG(debug, "RECV: " << frame);
+ inputHandler->received(frame);
+ }
+ }catch(const std::exception& e){
+ QPID_LOG(error, e.what());
+ }
+ }else{
+ framing::ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ QPID_LOG(debug, "INIT [" << aio << "]");
+ inputHandler->initiated(protocolInit);
+ initiated = true;
+ }
+ }
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (in.available() != 0) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += buff->dataCount-in.available();
+ buff->dataCount = in.available();
+ aio->unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio->queueReadBuffer(buff);
+ }
+}
+
+void AsynchIOHandler::eof(AsynchIO&) {
+ inputHandler->closed();
+ aio->queueWriteClose();
+}
+
+void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
+ delete &s;
+ aio->queueForDeletion();
+ delete this;
+}
+
+void AsynchIOHandler::disconnect(AsynchIO& a) {
+ // treat the same as eof
+ eof(a);
+}
+
+// Notifications
+void AsynchIOHandler::nobuffs(AsynchIO&) {
+}
+
+void AsynchIOHandler::idle(AsynchIO&){
+ ScopedLock<Mutex> l(frameQueueLock);
+
+ if (frameQueue.empty()) {
+ // At this point we know that we're write idling the connection
+ // so we could note that somewhere or do something special
+ return;
+ }
+
+ // Try and get a queued buffer if not then construct new one
+ AsynchIO::Buffer* buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount));
+ int buffUsed = 0;
+
+ while (!frameQueue.empty()) {
+ framing::AMQFrame frame = frameQueue.front();
+ frameQueue.pop();
+
+ // Encode output frame
+ int frameSize = frame.size();
+ if (frameSize > buff->byteCount)
+ THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+
+ // If we've filled current buffer then flush and get new one
+ if (frameSize > int(out->available())) {
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ buff = aio->getQueuedBuffer();
+ if (!buff)
+ buff = new Buff;
+ out.reset(new framing::Buffer(buff->bytes, buff->byteCount));
+ buffUsed = 0;
+ }
+
+ frame.encode(*out);
+ buffUsed += frameSize;
+ QPID_LOG(debug, "SENT: " << frame);
+ }
+
+ buff->dataCount = buffUsed;
+ aio->queueWrite(buff);
+
+ if (frameQueueClosed) {
+ aio->queueWriteClose();
+ }
+
+}
+
+}} // namespace qpid::sys