summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/TCPIOPlugin.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-20 18:42:46 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-20 18:42:46 +0000
commit5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (patch)
treef24776684c025fbed6a0431bf3d6811f0a1aae7a /cpp/src/qpid/sys/TCPIOPlugin.cpp
parent718ff5b34dd1e87eb79fa4c61fec668d1dc33103 (diff)
downloadqpid-python-5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3.tar.gz
Merge trunk to QPID-2519 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1186990 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp80
1 files changed, 55 insertions, 25 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index a6528f9ad9..85d8c1db87 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -25,31 +25,31 @@
#include "qpid/Plugin.h"
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/Poller.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-#include <memory>
+#include <boost/ptr_container/ptr_vector.hpp>
namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
const bool tcpNoDelay;
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
+ boost::ptr_vector<Socket> listeners;
+ boost::ptr_vector<AsynchAcceptor> acceptors;
+ uint16_t listeningPort;
public:
- AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
ConnectFailedCallback);
uint16_t getPort() const;
- std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
@@ -61,23 +61,49 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
static class TCPIOPlugin : public Plugin {
void earlyInitialize(Target&) {
}
-
+
void initialize(Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog,
- opts.tcpNoDelay));
- QPID_LOG(notice, "Listening on TCP port " << protocol->getPort());
- broker->registerProtocolFactory("tcp", protocol);
+ ProtocolFactory::shared_ptr protocolt(
+ new AsynchIOProtocolFactory(
+ "", boost::lexical_cast<std::string>(opts.port),
+ opts.connectionBacklog,
+ opts.tcpNoDelay));
+ QPID_LOG(notice, "Listening on TCP/TCP6 port " << protocolt->getPort());
+ broker->registerProtocolFactory("tcp", protocolt);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
- tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
-{}
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay) :
+ tcpNoDelay(nodelay)
+{
+ SocketAddress sa(host, port);
+
+ // We must have at least one resolved address
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+
+ listeningPort = lport;
+
+ // Try any other resolved addresses
+ while (sa.nextAddress()) {
+ // Hack to ensure that all listening connections are on the same port
+ sa.setAddrInfoPort(listeningPort);
+ QPID_LOG(info, "Listening to: " << sa.asString())
+ Socket* s = new Socket;
+ uint16_t lport = s->listen(sa, backlog);
+ QPID_LOG(debug, "Listened to: " << lport);
+ listeners.push_back(s);
+ }
+
+}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
@@ -107,16 +133,14 @@ uint16_t AsynchIOProtocolFactory::getPort() const {
return listeningPort; // Immutable no need for lock.
}
-std::string AsynchIOProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
- acceptor.reset(
- AsynchAcceptor::create(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
- acceptor->start(poller);
+ for (unsigned i = 0; i<listeners.size(); ++i) {
+ acceptors.push_back(
+ AsynchAcceptor::create(listeners[i],
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ acceptors[i].start(poller);
+ }
}
void AsynchIOProtocolFactory::connectFailed(
@@ -130,7 +154,7 @@ void AsynchIOProtocolFactory::connectFailed(
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
- const std::string& host, int16_t port,
+ const std::string& host, const std::string& port,
ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
@@ -139,8 +163,8 @@ void AsynchIOProtocolFactory::connect(
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
-
Socket* socket = new Socket();
+ try {
AsynchConnector* c = AsynchConnector::create(
*socket,
host,
@@ -150,6 +174,12 @@ void AsynchIOProtocolFactory::connect(
boost::bind(&AsynchIOProtocolFactory::connectFailed,
this, _1, _2, _3, failed));
c->start(poller);
+ } catch (std::exception&) {
+ // TODO: Design question - should we do the error callback and also throw?
+ int errCode = socket->getError();
+ connectFailed(*socket, errCode, strError(errCode), failed);
+ throw;
+ }
}
}} // namespace qpid::sys