diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2012-05-21 23:18:50 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2012-05-21 23:18:50 +0000 |
| commit | 3d7f434dd0799d72bc7f3b651bdb1e2a0558630e (patch) | |
| tree | c4dad57be7e9d52bec1b92c1cce4e2d9d6008765 | |
| parent | 7cdd22ea4e5f32ed6cfb89000d55dd6c3be225bc (diff) | |
| download | qpid-python-3d7f434dd0799d72bc7f3b651bdb1e2a0558630e.tar.gz | |
QPID-2518: Qpid C++ broker can easily be blocked by client trying to connect over SSL port
Implement timed disconnect for TCP and for SSL/TCP mux
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1341262 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 34 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/AsynchIOHandler.h | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/SslPlugin.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp | 21 |
6 files changed, 75 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index cb9d61a40f..b550156c00 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -128,8 +128,9 @@ Broker::Options::Options(const std::string& name) : queueFlowResumeRatio(70), queueThresholdEventRatio(80), defaultMsgGroup("qpid.no-group"), - timestampRcvMsgs(false), // set the 0.10 timestamp delivery property - linkMaintenanceInterval(2) + timestampRcvMsgs(false), // set the 0.10 timestamp delivery property + linkMaintenanceInterval(2), + maxNegotiateTime(2000) // 2s { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -171,6 +172,7 @@ Broker::Options::Options(const std::string& name) : ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.") ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS")) + ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation") ; } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 1596e0632b..72ed05aacb 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -126,6 +126,7 @@ class Broker : public sys::Runnable, public Plugin::Target, std::string defaultMsgGroup; bool timestampRcvMsgs; double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values. + uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation private: std::string getHome(); diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index 5233002850..3edc896724 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -23,6 +23,7 @@ #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Socket.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Timer.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/log/Statement.h" @@ -41,7 +42,25 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : +struct ProtocolTimeoutTask : public sys::TimerTask { + AsynchIOHandler& handler; + std::string id; + + ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : + TimerTask(timeout, "ProtocolTimeout"), + handler(h), + id(i) + {} + + void fire() { + // If this fires it means that we didn't negotiate the connection in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection " << id << " No protocol received closing"); + handler.abort(); + } +}; + +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), @@ -57,9 +76,13 @@ AsynchIOHandler::~AsynchIOHandler() { delete codec; } -void AsynchIOHandler::init(AsynchIO* a, int numBuffs) { +void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) { aio = a; + // Start timer for this connection + timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this); + timer.add(timeoutTimerTask); + // Give connection some buffers to use for (int i = 0; i < numBuffs; i++) { aio->queueReadBuffer(new Buff); @@ -143,6 +166,9 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { framing::ProtocolInitiation protocolInit; if (protocolInit.decode(in)) { decoded = in.getPosition(); + // We've just got the protocol negotiation so we can cancel the timeout for that + timeoutTimerTask->cancel(); + QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); @@ -202,6 +228,10 @@ void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, SecuritySettings()); write(framing::ProtocolInitiation(codec->getVersion())); + // We've just sent the protocol negotiation so we can cancel the timeout for that + // This is not ideal, because we've not received anything yet, but heartbeats will + // be active soon + timeoutTimerTask->cancel(); return; } if (codec == 0) return; diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h index b9867606c4..2ddd5c9a90 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h @@ -27,6 +27,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/CommonImportExport.h" +#include <boost/intrusive_ptr.hpp> + namespace qpid { namespace framing { @@ -38,6 +40,8 @@ namespace sys { class AsynchIO; struct AsynchIOBufferBase; class Socket; +class Timer; +class TimerTask; class AsynchIOHandler : public OutputControl { std::string identifier; @@ -49,13 +53,14 @@ class AsynchIOHandler : public OutputControl { AtomicValue<int32_t> readCredit; static const int32_t InfiniteCredit = -1; Mutex creditLock; + boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask; void write(const framing::ProtocolInitiation&); public: - QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); + QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f ); QPID_COMMON_EXTERN ~AsynchIOHandler(); - QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs); + QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs); QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; } diff --git a/qpid/cpp/src/qpid/sys/SslPlugin.cpp b/qpid/cpp/src/qpid/sys/SslPlugin.cpp index 48baef9042..7cd5059570 100644 --- a/qpid/cpp/src/qpid/sys/SslPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/SslPlugin.cpp @@ -39,6 +39,8 @@ namespace qpid { namespace sys { +class Timer; + using namespace qpid::sys::ssl; struct SslServerOptions : ssl::SslOptions @@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public ProtocolFactory { typedef SslAcceptorTmpl<T> SslAcceptor; + Timer& brokerTimer; + uint32_t maxNegotiateTime; const bool tcpNoDelay; T listener; const uint16_t listeningPort; @@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public ProtocolFactory { bool nodict; public: - SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin { try { ssl::initNSS(options, true); nssInitialized = true; - + const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(options.multiplex ? static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options, opts.connectionBacklog, - opts.tcpNoDelay)) : + opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime)) : static_cast<ProtocolFactory*>(new SslProtocolFactory(options, opts.connectionBacklog, - opts.tcpNoDelay))); + opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime))); QPID_LOG(notice, "Listening for " << (options.multiplex ? "SSL or TCP" : "SSL") << " connections on TCP port " << @@ -156,7 +162,9 @@ static struct SslPlugin : public Plugin { } sslPlugin; template <class T> -SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) : +SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) : + brokerTimer(timer), + maxNegotiateTime(maxTime), tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)), nodict(options.nodict) {} @@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(Poller::shared_ptr poller, const Socket& boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); + async->init(aio, brokerTimer, maxNegotiateTime, 4); aio->start(poller); } diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp index bd10a5555a..551440f954 100644 --- a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -36,14 +36,21 @@ namespace qpid { namespace sys { +class Timer; + class AsynchIOProtocolFactory : public ProtocolFactory { - const bool tcpNoDelay; boost::ptr_vector<Socket> listeners; boost::ptr_vector<AsynchAcceptor> acceptors; + Timer& brokerTimer; + uint32_t maxNegotiateTime; uint16_t listeningPort; + const bool tcpNoDelay; public: - AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen); + AsynchIOProtocolFactory(const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime, + bool shouldListen); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, const std::string& port, ConnectionCodec::Factory*, @@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin { "", boost::lexical_cast<std::string>(opts.port), opts.connectionBacklog, opts.tcpNoDelay, + broker->getTimer(), opts.maxNegotiateTime, shouldListen)); if (shouldListen) { @@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin { } } tcpPlugin; -AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) : +AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, + int backlog, bool nodelay, + Timer& timer, uint32_t maxTime, + bool shouldListen) : + brokerTimer(timer), + maxNegotiateTime(maxTime), tcpNoDelay(nodelay) { if (!shouldListen) { @@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socke boost::bind(&AsynchIOHandler::nobuffs, async, _1), boost::bind(&AsynchIOHandler::idle, async, _1)); - async->init(aio, 4); + async->init(aio, brokerTimer, maxNegotiateTime, 4); aio->start(poller); } |
