diff options
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionCodec.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ProtocolAccess.h | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ProtocolFactory.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/AsynchIO.cpp | 3 |
7 files changed, 117 insertions, 27 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index ca2bd7c93c..31974993bb 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -36,13 +36,14 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) : identifier(id), aio(0), factory(f), codec(0), readError(false), - isClient(false) + isClient(false), + access(a) {} AsynchIOHandler::~AsynchIOHandler() { @@ -152,7 +153,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier); + codec = factory->create(*this, identifier, access); write(framing::ProtocolInitiation(codec->getVersion())); return; } diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index 530613367a..ece52f57c4 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -32,7 +32,7 @@ namespace framing { } namespace sys { - +class ProtocolAccess; class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; @@ -40,11 +40,12 @@ class AsynchIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; bool isClient; + ProtocolAccess* access; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0); ~AsynchIOHandler(); void init(AsynchIO* a, int numBuffs); diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h index 205596c709..4c5a68e576 100644 --- a/cpp/src/qpid/sys/ConnectionCodec.h +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -28,9 +28,8 @@ namespace qpid { -namespace broker { class Broker; } - namespace sys { +class ProtocolAccess; /** * Interface of coder/decoder for a connection of a specific protocol @@ -70,7 +69,7 @@ class ConnectionCodec { /** Return "preferred" codec for outbound connections. */ virtual ConnectionCodec* create( - OutputControl&, const std::string& id + OutputControl&, const std::string& id, ProtocolAccess* a = 0 ) = 0; }; }; diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h new file mode 100644 index 0000000000..433bf0ef97 --- /dev/null +++ b/cpp/src/qpid/sys/ProtocolAccess.h @@ -0,0 +1,65 @@ +#ifndef _sys_ProtocolAccess_h +#define _sys_ProtocolAccess_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "AsynchIO.h" +#include "AsynchIOHandler.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { + +namespace broker +{ +class Connection; +} + +namespace sys { + +class ProtocolAccess +{ +public: + typedef boost::function0<void> Callback; + typedef boost::function2<void, int, std::string> ClosedCallback; + typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback; + + ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb) + : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {} + ~ProtocolAccess() {} + inline void close() { if (aio) aio->queueWriteClose(); } + + inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); } + inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); } + inline void closed(int err, std::string str) { closedCb(err, str); } + inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); } + +private: + AsynchIO* aio; + Callback establishedCb; + ClosedCallback closedCb; + SetConnCallback setConnCb; +}; + +}} + +#endif //!_sys_ProtocolAccess_h diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index 5f80771e49..e61a94b205 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -25,7 +25,7 @@ #include <stdint.h> #include "qpid/SharedObject.h" #include "ConnectionCodec.h" - +#include "ProtocolAccess.h" namespace qpid { namespace sys { @@ -42,7 +42,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> virtual void connect( boost::shared_ptr<Poller>, const std::string& host, int16_t port, - ConnectionCodec::Factory* codec) = 0; + ConnectionCodec::Factory* codec, + ProtocolAccess* access = 0) = 0; }; inline ProtocolFactory::~ProtocolFactory() {} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 045bc56e90..5d2cadbe03 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -41,13 +41,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory { public: AsynchIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, + ConnectionCodec::Factory*, ProtocolAccess*); uint16_t getPort() const; std::string getHost() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, + bool isClient, ProtocolAccess*); }; // Static instance to initialise plugin @@ -72,17 +74,32 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + ConnectionCodec::Factory* f, bool isClient, + ProtocolAccess* a) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a); + AsynchIO* aio; + if (isClient) async->setClient(); - AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); + if (a == 0) + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + else { + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&ProtocolAccess::closedEof, a, async), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + a->setAio(aio); + } + async->init(aio, 4); aio->start(poller); } @@ -95,26 +112,31 @@ std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false, + (ProtocolAccess*) 0))); acceptor->start(poller); } void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, - ConnectionCodec::Factory* f) + ConnectionCodec::Factory* fact, + ProtocolAccess* access) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. + Socket* socket = new Socket(); - new AsynchConnector(*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true)); + new AsynchConnector (*socket, poller, host, port, + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access), + boost::bind(&ProtocolAccess::closed, access, _1, _2)); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp index 9dcb841992..470db4c614 100644 --- a/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -130,7 +130,7 @@ void AsynchConnector::connComplete(DispatchHandle& h) h.stopWatch(); if (errCode == 0) { connCallback(socket); - DispatchHandle::doDelete(); + DispatchHandle::doDelete(); } else { failure(errCode, std::string(strerror(errCode))); } @@ -148,6 +148,7 @@ void AsynchConnector::failure(int errCode, std::string message) } /* +>>>>>>> .r654667 * Asynch reader/writer */ AsynchIO::AsynchIO(const Socket& s, |
