diff options
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ConnectionCodec.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ProtocolAccess.h | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/ProtocolFactory.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Socket.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/posix/Socket.cpp | 7 |
8 files changed, 31 insertions, 105 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 31974993bb..ca2bd7c93c 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -36,14 +36,13 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) : +AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), codec(0), readError(false), - isClient(false), - access(a) + isClient(false) {} AsynchIOHandler::~AsynchIOHandler() { @@ -153,7 +152,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier, access); + codec = factory->create(*this, identifier); write(framing::ProtocolInitiation(codec->getVersion())); return; } diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index ece52f57c4..7448094a94 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -32,7 +32,6 @@ namespace framing { } namespace sys { -class ProtocolAccess; class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; @@ -40,12 +39,11 @@ class AsynchIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; bool isClient; - ProtocolAccess* access; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0); + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); ~AsynchIOHandler(); void init(AsynchIO* a, int numBuffs); diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h index 4c5a68e576..efc6839b60 100644 --- a/cpp/src/qpid/sys/ConnectionCodec.h +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -29,7 +29,6 @@ namespace qpid { namespace sys { -class ProtocolAccess; /** * Interface of coder/decoder for a connection of a specific protocol @@ -69,7 +68,7 @@ class ConnectionCodec { /** Return "preferred" codec for outbound connections. */ virtual ConnectionCodec* create( - OutputControl&, const std::string& id, ProtocolAccess* a = 0 + OutputControl&, const std::string& id ) = 0; }; }; diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h deleted file mode 100644 index 433bf0ef97..0000000000 --- a/cpp/src/qpid/sys/ProtocolAccess.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef _sys_ProtocolAccess_h -#define _sys_ProtocolAccess_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "AsynchIO.h" -#include "AsynchIOHandler.h" -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> - -namespace qpid { - -namespace broker -{ -class Connection; -} - -namespace sys { - -class ProtocolAccess -{ -public: - typedef boost::function0<void> Callback; - typedef boost::function2<void, int, std::string> ClosedCallback; - typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback; - - ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb) - : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {} - ~ProtocolAccess() {} - inline void close() { if (aio) aio->queueWriteClose(); } - - inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); } - inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); } - inline void closed(int err, std::string str) { closedCb(err, str); } - inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); } - -private: - AsynchIO* aio; - Callback establishedCb; - ClosedCallback closedCb; - SetConnCallback setConnCb; -}; - -}} - -#endif //!_sys_ProtocolAccess_h diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index e61a94b205..e8eaefe1f6 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -25,7 +25,7 @@ #include <stdint.h> #include "qpid/SharedObject.h" #include "ConnectionCodec.h" -#include "ProtocolAccess.h" +#include <boost/function.hpp> namespace qpid { namespace sys { @@ -43,7 +43,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> boost::shared_ptr<Poller>, const std::string& host, int16_t port, ConnectionCodec::Factory* codec, - ProtocolAccess* access = 0) = 0; + boost::function2<void, int, std::string> failed) = 0; }; inline ProtocolFactory::~ProtocolFactory() {} diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 806d6b5164..f95d841b39 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -118,6 +118,7 @@ public: private: Socket(IOHandlePrivate*); + mutable std::string connectname; }; }} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 5d2cadbe03..e82a6a9102 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -42,14 +42,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory { AsynchIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, - ConnectionCodec::Factory*, ProtocolAccess*); + ConnectionCodec::Factory*, + boost::function2<void, int, std::string> failed); uint16_t getPort() const; std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient, ProtocolAccess*); + bool isClient); }; // Static instance to initialise plugin @@ -74,31 +75,18 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient, - ProtocolAccess* a) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a); - AsynchIO* aio; + ConnectionCodec::Factory* f, bool isClient) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); if (isClient) async->setClient(); - if (a == 0) - aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - else { - aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&ProtocolAccess::closedEof, a, async), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - a->setAio(aio); - } + AsynchIO* aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, 4); aio->start(poller); @@ -116,8 +104,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false, - (ProtocolAccess*) 0))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } @@ -125,7 +112,7 @@ void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* fact, - ProtocolAccess* access) + boost::function2<void, int, std::string> failed) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector @@ -135,8 +122,8 @@ void AsynchIOProtocolFactory::connect( Socket* socket = new Socket(); new AsynchConnector (*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access), - boost::bind(&ProtocolAccess::closed, access, _1, _2)); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true), + failed); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 67f6b6db4c..f4320531a9 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -32,6 +32,7 @@ #include <netdb.h> #include <cstdlib> #include <string.h> +#include <iostream> #include <boost/format.hpp> @@ -138,6 +139,10 @@ const char* h_errstr(int e) { void Socket::connect(const std::string& host, int port) const { + std::stringstream namestream; + namestream << host << ":" << port; + connectname = namestream.str(); + const int& socket = impl->fd; struct sockaddr_in name; name.sin_family = AF_INET; @@ -240,6 +245,8 @@ std::string Socket::getPeername() const std::string Socket::getPeerAddress() const { + if (!connectname.empty()) + return std::string (connectname); return getName(impl->fd, false, true); } |
