diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-04-22 21:13:20 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-22 21:13:20 +0000 |
| commit | 52ffbbfef04ee479a341ff640cb2df9c41897963 (patch) | |
| tree | b9cecb2bde45ad7b2c118d23e3d9c1f2c74560cf /cpp/src/qpid/sys | |
| parent | 63d88fa5ff3407f26590bdf6b5a956d3307f677e (diff) | |
| download | qpid-python-52ffbbfef04ee479a341ff640cb2df9c41897963.tar.gz | |
* Renamed the Acceptor class to be the ProtocolFactory class
which better approximates its current behaviour
* Slightly refactored TCPIOPlugin to better approximate how it would look
when we implement a proper AsynchConnector
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@650657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/ProtocolFactory.h (renamed from cpp/src/qpid/sys/Acceptor.h) | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 50 |
2 files changed, 28 insertions, 36 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/ProtocolFactory.h index 69a6eb8d7c..5f80771e49 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -1,5 +1,5 @@ -#ifndef _sys_Acceptor_h -#define _sys_Acceptor_h +#ifndef _sys_ProtocolFactory_h +#define _sys_ProtocolFactory_h /* * @@ -32,23 +32,23 @@ namespace sys { class Poller; -class Acceptor : public qpid::SharedObject<Acceptor> +class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> { public: - virtual ~Acceptor() = 0; + virtual ~ProtocolFactory() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; + virtual void accept(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( boost::shared_ptr<Poller>, const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; }; -inline Acceptor::~Acceptor() {} +inline ProtocolFactory::~ProtocolFactory() {} }} -#endif /*!_sys_Acceptor_h*/ +#endif //!_sys_ProtocolFactory_h diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index eb6bcb3dee..65ea380b07 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -19,7 +19,7 @@ * */ -#include "Acceptor.h" +#include "ProtocolFactory.h" #include "AsynchIOHandler.h" #include "AsynchIO.h" @@ -33,21 +33,21 @@ namespace qpid { namespace sys { -class AsynchIOAcceptor : public Acceptor { +class AsynchIOProtocolFactory : public ProtocolFactory { Socket listener; const uint16_t listeningPort; std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog); - void run(Poller::shared_ptr, ConnectionCodec::Factory*); + AsynchIOProtocolFactory(int16_t port, int backlog); + void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); uint16_t getPort() const; std::string getHost() const; private: - void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); }; // Static instance to initialise plugin @@ -56,24 +56,26 @@ static class TCPIOPlugin : public Plugin { } void initialize(Target& target) { - broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); // Only provide to a Broker if (broker) { const broker::Broker::Options& opts = broker->getOptions(); - Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog)); - QPID_LOG(info, "Listening on TCP port " << acceptor->getPort()); - broker->registerAccepter(acceptor); + ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog)); + QPID_LOG(info, "Listening on TCP port " << protocol->getPort()); + broker->registerProtocolFactory(protocol); } } -} acceptor; +} tcpPlugin; -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : listeningPort(listener.listen(port, backlog)) {} -void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) { +void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, + ConnectionCodec::Factory* f, bool isClient) { AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + if (isClient) + async->setClient(); AsynchIO* aio = new AsynchIO(s, boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), boost::bind(&AsynchIOHandler::eof, async, _1), @@ -85,40 +87,30 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn aio->start(poller); } - -uint16_t AsynchIOAcceptor::getPort() const { +uint16_t AsynchIOProtocolFactory::getPort() const { return listeningPort; // Immutable no need for lock. } -std::string AsynchIOAcceptor::getHost() const { +std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } -void AsynchIOAcceptor::connect( +void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); - AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f); - async->setClient(); - AsynchIO* aio = new AsynchIO(*socket, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); - aio->start(poller); + + established(poller, *socket, f, true); } }} // namespace qpid::sys |
