diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-10 16:15:08 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-10 16:15:08 +0000 |
| commit | 085486ebe5ff21133b9caf1c31625ac6ea356568 (patch) | |
| tree | 7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/cpp | |
| parent | 60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff) | |
| parent | e2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff) | |
| download | qpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz | |
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
42 files changed, 488 insertions, 84 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt index 29a52cbb49..aaa4203c1e 100644 --- a/qpid/cpp/CMakeLists.txt +++ b/qpid/cpp/CMakeLists.txt @@ -182,6 +182,11 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro) set (HIDE_SYMBOL_FLAGS "") endif (CMAKE_CXX_COMPILER_ID STREQUAL SunPro) +# XL is IBM XL C/C++ +if (CMAKE_CXX_COMPILER_ID MATCHES XL) + set (COMPILER_FLAGS "-qtls -qrtti") +endif (CMAKE_CXX_COMPILER_ID MATCHES XL) + if (CMAKE_SYSTEM_NAME STREQUAL Windows) # Allow MSVC user to select 'WinXP-SP3/Windows Server 2003' as build target version set (win32_winnt_default OFF) diff --git a/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake b/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake new file mode 100644 index 0000000000..fb515cd149 --- /dev/null +++ b/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake @@ -0,0 +1,59 @@ +# +# $Id $ +# +# Author(s): Anton Deguet +# Created on: 2011 +# +# (C) Copyright 2011 Johns Hopkins University (JHU), All Rights +# Reserved. +# +# --- begin cisst license - do not edit --- +# +# This software is provided "as is" under an open source license, with +# no warranty. The complete license can be found in license.txt and +# http://www.cisst.org/cisst/license.txt. +# +# --- end cisst license --- + +function (check_size_t_native_type VARIABLE) + # make sure we don't test over and over + if ("${VARIABLE}" MATCHES "^${VARIABLE}$") + message (STATUS "Checking to see if size_t is a native type") + set (SOURCE + "#include <vector> + char method(unsigned int p) { + return 'u'; + } + char method(unsigned long long int p) { + return 'l'; + } + char method(size_t p) { + return 's'; + } + int main(void) {}") + + file (WRITE + "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp" + "${SOURCE}\n") + + try_compile (${VARIABLE} + ${CMAKE_BINARY_DIR} + "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp" + OUTPUT_VARIABLE OUTPUT) + + # report using message and log files + if (${VARIABLE}) + message (STATUS "Checking to see if size_t is a native type - yes") + file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeOutput.log + "Determining if size_t is a native type passed with " + "the following output:\n${OUTPUT}\n\n") + else (${VARIABLE}) + message (STATUS "Checking to see if size_t is a native type - no") + file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeError.log + "Determining if size_t is a native type passed with " + "the following output:\n${OUTPUT}\n\n") + endif (${VARIABLE}) + + endif ("${VARIABLE}" MATCHES "^${VARIABLE}$") + +endfunction (check_size_t_native_type VARIABLE) diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL index 0e91cddec8..717c9b0908 100644 --- a/qpid/cpp/INSTALL +++ b/qpid/cpp/INSTALL @@ -91,7 +91,7 @@ Or if you have only have a command line environment available 2.1 Building as C++11 (Experimental) ==================================== -Currently the Qpoid project uses C++ that conforms to the C++03 standard, as currently +Currently the Qpid project uses C++ that conforms to the C++03 standard, as currently this is the C++ standard that is supported the most widely, so any new code must also compile as C++03. As an experiment (and to support a few extra platforms) the Qpid code will also now build as C++11. @@ -141,6 +141,23 @@ If you want to use the ports version of cyrus-sasl then you should also add: Which will allow cmake to find libraries installed in /usr/local (which is where cyrus-sasl gets installed by ports). +2.4 Building on AIX +=================== +Qpid has been tested on AIX 7.1 with XL C++ 13.1 and Boost 1.55.0. The +thread-using variant of the compiler must be used but it isn't the default +picked up by cmake. Thus, the compiler must be specified at cmake time. +For example (assuming PATH includes the compiler binaries): + + # CXX=xlC_r CC=cc_r cmake .. + +Warnings from Boost header files are expected and can be ignored. + +It is normal to see (lots of) multiply-defined symbol warnings when linking +the shared libraries built as part of Qpid. + +The mktemp package must be installed separately in order to execute the +Qpid test suite. + 3. Building a Repository Working Copy ===================================== To get the source code from the subversion repository (trunk) do: diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 3e5165dfb0..8263534614 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -42,6 +42,7 @@ include(CheckIncludeFiles) include(CheckIncludeFileCXX) include(CheckLibraryExists) include(CheckSymbolExists) +include(CheckSizeTNativeType) find_package(PkgConfig) find_package(Ruby) @@ -350,6 +351,8 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) mark_as_advanced(QPID_POLLER) endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) +check_size_t_native_type (QPID_SIZE_T_NATIVE) + option(BUILD_SASL "Build with Cyrus SASL support" ${SASL_FOUND}) if (BUILD_SASL) if (NOT SASL_FOUND) @@ -684,6 +687,12 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) ) endif (CMAKE_SYSTEM_NAME STREQUAL SunOS) + if (CMAKE_SYSTEM_NAME STREQUAL AIX) + set (qpid_system_module + qpid/sys/aix/SystemInfo.cpp + ) + endif (CMAKE_SYSTEM_NAME STREQUAL AIX) + if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro) # -lmalloc needed for mallinfo. set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lmalloc") @@ -1145,6 +1154,9 @@ set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version} SOVERSION ${qpidbroker_version_major} COMPILE_DEFINITIONS _IN_QPID_BROKER) +if (CMAKE_CXX_COMPILER_ID MATCHES XL) + set_target_properties (qpidbroker PROPERTIES LINK_FLAGS -Wl,-bbigtoc) +endif (CMAKE_CXX_COMPILER_ID MATCHES XL) if (MSVC) set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290) diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index 777fc1b893..478b369eb6 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -56,6 +56,7 @@ #cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H} #cmakedefine HAVE_LOG_AUTHPRIV #cmakedefine HAVE_LOG_FTP +#cmakedefine QPID_SIZE_T_NATIVE #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION #cmakedefine HAVE_PROTON_EVENTS diff --git a/qpid/cpp/src/qpid/InlineAllocator.h b/qpid/cpp/src/qpid/InlineAllocator.h index 2502545dcb..28ea73ec12 100644 --- a/qpid/cpp/src/qpid/InlineAllocator.h +++ b/qpid/cpp/src/qpid/InlineAllocator.h @@ -47,7 +47,7 @@ class InlineAllocator : public BaseAllocator { InlineAllocator() : allocated(false) {} InlineAllocator(const InlineAllocator& x) : BaseAllocator(x), allocated(false) {} - pointer allocate(size_type n) { + pointer allocate(size_type n, std::allocator<void>::const_pointer = 0) { if (n <= Max && !allocated) { allocated=true; return reinterpret_cast<value_type*>(address()); diff --git a/qpid/cpp/src/qpid/NullSaslServer.cpp b/qpid/cpp/src/qpid/NullSaslServer.cpp index 40bd9ebbc6..9d560c8e68 100644 --- a/qpid/cpp/src/qpid/NullSaslServer.cpp +++ b/qpid/cpp/src/qpid/NullSaslServer.cpp @@ -66,7 +66,6 @@ NullSaslServer::Status NullSaslServer::start(const std::string& mechanism, const NullSaslServer::Status NullSaslServer::step(const std::string* /*response*/, std::string& /*challenge*/) { - assert(false); return FAIL; } std::string NullSaslServer::getMechanisms() diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp index cdba53449a..5ca91e6bd4 100644 --- a/qpid/cpp/src/qpid/Options.cpp +++ b/qpid/cpp/src/qpid/Options.cpp @@ -16,6 +16,7 @@ * */ +#include "config.h" #include "qpid/Options.h" #include "qpid/OptionsTemplates.h" #include "qpid/Exception.h" @@ -145,6 +146,9 @@ template QPID_COMMON_EXTERN po::value_semantic* create_value(int64_t& val, const template QPID_COMMON_EXTERN po::value_semantic* create_value(uint16_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint32_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint64_t& val, const std::string& arg); +#ifdef QPID_SIZE_T_NATIVE +template QPID_COMMON_EXTERN po::value_semantic* create_value(size_t& val, const std::string& arg); +#endif template QPID_COMMON_EXTERN po::value_semantic* create_value(double& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(string& val, const std::string& arg); diff --git a/qpid/cpp/src/qpid/RangeSet.h b/qpid/cpp/src/qpid/RangeSet.h index 78677f1263..20ee722fcb 100644 --- a/qpid/cpp/src/qpid/RangeSet.h +++ b/qpid/cpp/src/qpid/RangeSet.h @@ -96,9 +96,9 @@ class Range { */ template <class T> class RangeSet - : boost::additive1<RangeSet<T>, - boost::additive2<RangeSet<T>, Range<T>, - boost::additive2<RangeSet<T>, T> > > + : private boost::additive1<RangeSet<T>, + boost::additive2<RangeSet<T>, Range<T>, + boost::additive2<RangeSet<T>, T> > > { typedef InlineVector<Range<T>, 3> Ranges; // TODO aconway 2008-04-21: what's the optimial inlined value? diff --git a/qpid/cpp/src/qpid/RefCounted.h b/qpid/cpp/src/qpid/RefCounted.h index 26e3e2c4ba..c2ec367658 100644 --- a/qpid/cpp/src/qpid/RefCounted.h +++ b/qpid/cpp/src/qpid/RefCounted.h @@ -33,7 +33,7 @@ namespace qpid { * to the class that has mixed this in not the class itself (as that would sidestep * the reference counting) */ -class RefCounted : boost::noncopyable { +class RefCounted : private boost::noncopyable { mutable boost::detail::atomic_count count; public: diff --git a/qpid/cpp/src/qpid/SessionId.h b/qpid/cpp/src/qpid/SessionId.h index e18b360999..d950ad9d1a 100644 --- a/qpid/cpp/src/qpid/SessionId.h +++ b/qpid/cpp/src/qpid/SessionId.h @@ -39,7 +39,7 @@ namespace qpid { * The name must be unique among sessions with the same authentication * principal. */ -class SessionId : boost::totally_ordered1<SessionId> { +class SessionId : private boost::totally_ordered1<SessionId> { std::string userId; std::string name; public: diff --git a/qpid/cpp/src/qpid/amqp/MapEncoder.h b/qpid/cpp/src/qpid/amqp/MapEncoder.h index 42fb819932..1481f9125a 100644 --- a/qpid/cpp/src/qpid/amqp/MapEncoder.h +++ b/qpid/cpp/src/qpid/amqp/MapEncoder.h @@ -31,7 +31,7 @@ struct Descriptor; /** * Encode map like data */ -class MapEncoder : public MapHandler, Encoder +class MapEncoder : public MapHandler, private Encoder { public: MapEncoder(char* data, size_t size); diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.h b/qpid/cpp/src/qpid/broker/LossyLvq.h index e0a266ab77..2665ebe49f 100644 --- a/qpid/cpp/src/qpid/broker/LossyLvq.h +++ b/qpid/cpp/src/qpid/broker/LossyLvq.h @@ -28,6 +28,12 @@ namespace qpid { namespace broker { class MessageMap; +// Disable inherited-by-dominance warning on MSVC. We know. It's ok. +#ifdef _MSC_VER +# pragma warning(push) +# pragma warning(disable : 4250) +#endif + /** * Combination of LossyQueue and Lvq behaviours. */ @@ -36,6 +42,11 @@ class LossyLvq : public Lvq, public LossyQueue public: LossyLvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); }; + +#ifdef _MSC_VER +# pragma warning(pop) +#endif + }} // namespace qpid::broker #endif /*!QPID_BROKER_LOSSYLVQ_H*/ diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 0eede36f3f..af4e8e50fb 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -44,7 +44,7 @@ class OwnershipToken; * are deleted when and only when they are no longer in use. * */ -class QueueRegistry : QueueFactory { +class QueueRegistry : private QueueFactory { public: QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0); QPID_BROKER_EXTERN ~QueueRegistry(); diff --git a/qpid/cpp/src/qpid/broker/TopicKeyNode.h b/qpid/cpp/src/qpid/broker/TopicKeyNode.h index 7671ed069d..ac760b198e 100644 --- a/qpid/cpp/src/qpid/broker/TopicKeyNode.h +++ b/qpid/cpp/src/qpid/broker/TopicKeyNode.h @@ -166,7 +166,7 @@ class QPID_BROKER_CLASS_EXTERN TopicKeyNode { bool isHash; // children - typedef std::map<const std::string, typename TopicKeyNode::shared_ptr> ChildMap; + typedef std::map<std::string, typename TopicKeyNode::shared_ptr> ChildMap; ChildMap childTokens; typename TopicKeyNode::shared_ptr starChild; // "*" subtree typename TopicKeyNode::shared_ptr hashChild; // "#" subtree diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 3a93e2aac5..8d6516edee 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -121,7 +121,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker connection(pn_connection()), transport(pn_transport()), collector(0), - out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) + out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false), ioRequested(false) { #ifdef HAVE_PROTON_EVENTS collector = pn_collector(); @@ -157,6 +157,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker void Connection::requestIO() { + ioRequested = true; out.activateOutput(); } @@ -179,13 +180,24 @@ size_t Connection::decode(const char* buffer, size_t size) { QPID_LOG(trace, id << " decode(" << size << ")"); if (size == 0) return 0; - //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size); if (n > 0 || n == PN_EOS) { - //If engine returns EOS, have no way of knowing how many bytes - //it processed, but can assume none need to be reprocessed so - //consider them all read: - if (n == PN_EOS) n = size; + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + out.abort(); + return 0; + } else { + n = size; // assume all consumed + } + } QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size); try { process(); @@ -209,7 +221,11 @@ size_t Connection::decode(const char* buffer, size_t size) } return n; } else if (n == PN_ERR) { - throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError())); + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + out.abort(); + return 0; } else { return 0; } @@ -224,8 +240,21 @@ size_t Connection::encode(char* buffer, size_t size) QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; return n; + } else if (n == PN_EOS) { + haveOutput = false; + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + out.abort(); + } + return 0; } else if (n == PN_ERR) { - throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError())); + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + out.abort(); + return 0; } else { haveOutput = false; return 0; @@ -291,6 +320,7 @@ bool Connection::canEncode() } else { QPID_LOG(info, "Connection " << id << " has been closed locally"); } + if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true; pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) return haveOutput; @@ -303,8 +333,22 @@ void Connection::open() pn_connection_set_container(connection, getBroker().getFederationTag().c_str()); uint32_t timeout = pn_transport_get_remote_idle_timeout(transport); if (timeout) { - ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this)); - pn_transport_set_idle_timeout(transport, timeout); + // if idle generate empty frames at 1/2 the timeout interval as keepalives: + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask((timeout+1)/2, + getBroker().getTimer(), + *this)); + getBroker().getTimer().add(ticker); + + // Note: in version 0-10 of the protocol, idle timeout applies to both + // ends. AMQP 1.0 changes that - it's now asymmetric: each end can + // configure/disable it independently. For backward compatibility, by + // default mimic the old behavior and set our local timeout. + // Use 2x the remote's timeout, as per the spec the remote should + // advertise 1/2 its actual timeout threshold + pn_transport_set_idle_timeout(transport, timeout * 2); + QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(transport) + << " remote=" << pn_transport_get_remote_idle_timeout(transport)); } pn_connection_open(connection); @@ -585,4 +629,22 @@ void Connection::doDeliveryUpdated(pn_delivery_t *delivery) } } +// check for failures of the transport: +bool Connection::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(transport); + if (pn_condition_is_set(tcondition)) + info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition); +#else + pn_error_t* terror = pn_transport_error(transport); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index ea4ce06163..e97d041c03 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -24,6 +24,7 @@ #include "qpid/sys/ConnectionCodec.h" #include "qpid/broker/amqp/BrokerContext.h" #include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/sys/AtomicValue.h" #include <map> #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -80,6 +81,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man bool closeInitiated; bool closeRequested; boost::intrusive_ptr<sys::TimerTask> ticker; + qpid::sys::AtomicValue<bool> ioRequested; virtual void process(); void doOutput(size_t); @@ -92,6 +94,8 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void closedByManagement(); private: + bool checkTransportError(std::string&); + // handle Proton engine events void doConnectionRemoteOpen(); void doConnectionRemoteClose(); @@ -100,7 +104,6 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void doLinkRemoteOpen(pn_link_t *link); void doLinkRemoteClose(pn_link_t *link); void doDeliveryUpdated(pn_delivery_t *delivery); - }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h index e12e9e412b..c246fb9ede 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Filter.h +++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h @@ -37,7 +37,7 @@ struct QueueSettings; namespace amqp { class Outgoing; -class Filter : qpid::amqp::MapReader +class Filter : private qpid::amqp::MapReader { public: Filter(); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 0225ee74cb..ed931c90fb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -548,8 +548,8 @@ Subscription::Subscription(const Address& address, const std::string& type) if ((Opt(address)/LINK).hasKey(TIMEOUT)) { const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); - } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { - //if durable but not reliable, and auto-delete not + } else if (durable && !AddressResolution::is_reliable(address) && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { + //if durable, not explicitly reliable, and auto-delete not //explicitly set, then set a non-zero default for the //autodelete timeout queueOptions.setInt("qpid.auto_delete_timeout", 2*60); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index e300ddae8b..2ca2c85c64 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -164,8 +164,13 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) ScopedRelease release(inUse, lock); sys::Mutex::ScopedUnlock l(lock); //wait for suitable new message to arrive - if (process(&handler, get_duration(timeout, deadline))) { + switch (process(&handler, get_duration(timeout, deadline))) { + case OK: return true; + case CLOSED: + return false; + case EMPTY: + break; } } if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed"); @@ -233,7 +238,7 @@ void IncomingMessages::releaseAll() } //then pump out any available messages from incoming queue... GetAny handler; - while (process(&handler, 0)) ; + while (process(&handler, 0) == OK) ; //now release all messages sys::Mutex::ScopedLock l(lock); acceptTracker.release(session); @@ -242,7 +247,7 @@ void IncomingMessages::releaseAll() void IncomingMessages::releasePending(const std::string& destination) { //first pump all available messages from incoming to received... - while (process(0, 0)) ; + while (process(0, 0) == OK) ; //now remove all messages for this destination from received list, recording their ids... sys::Mutex::ScopedLock l(lock); @@ -269,7 +274,7 @@ bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration ti * that are not accepted by the handler are pushed onto received queue * for later retrieval. */ -bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) +IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; @@ -282,7 +287,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " << *content->getHeaders()); - return true; + return OK; } else { //received message for another destination, keep for later QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); @@ -295,8 +300,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } } } - catch (const qpid::ClosedException&) {} // Just return false if queue closed. - return false; + catch (const qpid::ClosedException&) { return CLOSED; } + return EMPTY; } bool IncomingMessages::wait(qpid::sys::Duration duration) @@ -331,7 +336,7 @@ uint32_t IncomingMessages::pendingAccept(const std::string& destination) uint32_t IncomingMessages::available() { //first pump all available messages from incoming to received... - while (process(0, 0)) {} + while (process(0, 0) == OK) {} //return the count of received messages sys::Mutex::ScopedLock l(lock); return received.size(); @@ -340,7 +345,7 @@ uint32_t IncomingMessages::available() uint32_t IncomingMessages::available(const std::string& destination) { //first pump all available messages from incoming to received... - while (process(0, 0)) {} + while (process(0, 0) == OK) {} //count all messages for this destination from received list sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index c9ea0673a3..4c9ee68ece 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -87,6 +87,7 @@ class IncomingMessages uint32_t available(const std::string& destination); private: typedef std::deque<FrameSetPtr> FrameSetQueue; + enum ProcessState {EMPTY=0,OK=1,CLOSED=2}; sys::Monitor lock; qpid::client::AsyncSession session; @@ -95,7 +96,7 @@ class IncomingMessages FrameSetQueue received; AcceptTracker acceptTracker; - bool process(Handler*, qpid::sys::Duration); + ProcessState process(Handler*, qpid::sys::Duration); bool wait(qpid::sys::Duration); bool pop(FrameSetPtr&, qpid::sys::Duration); diff --git a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp index d0be818df0..dc82ece9d1 100644 --- a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp @@ -115,7 +115,9 @@ SslConnector::SslConnector(boost::shared_ptr<qpid::sys::Poller> p, ConnectionImpl* cimpl) : TCPConnector(p, ver, settings, cimpl), shim(0), poller(p) { - + if (settings.sslIgnoreHostnameVerificationFailure) { + sslCredential.ignoreHostnameVerificationFailure(); + } const std::string& name = (settings.sslCertName != "") ? settings.sslCertName : qpid::sys::ssl::SslOptions::global.certName; certLoaded = sslCredential.load(name); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index fedab4286f..a0b16c2b4c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -39,6 +39,7 @@ #include "qpid/sys/SecurityLayer.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/urlAdd.h" #include "config.h" #include <boost/lexical_cast.hpp> @@ -95,6 +96,27 @@ std::string get_error(pn_connection_t* connection, pn_transport_t* transport) } #endif +class ConnectionTickerTask : public qpid::sys::TimerTask +{ + qpid::sys::Timer& timer; + ConnectionContext& connection; + public: + ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) : + TimerTask(interval, "ConnectionTicker"), + timer(t), + connection(c) + {} + + void fire() { + QPID_LOG(debug, "ConnectionTickerTask fired"); + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Ticker + connection.activateOutput(); + } +}; } void ConnectionContext::trace(const char* message) const @@ -118,23 +140,15 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types:: // Concatenate all known URLs into a single URL, get rid of duplicate addresses. sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? qpid::Address::TCP : protocol); - if (pn_transport_bind(engine, connection)) { - //error - } if (identifier.empty()) { identifier = qpid::types::Uuid(true).str(); } - pn_connection_set_container(connection, identifier.c_str()); - bool enableTrace(false); - QPID_LOG_TEST_CAT(trace, protocol, enableTrace); - if (enableTrace) { - pn_transport_trace(engine, PN_TRACE_FRM); - set_tracer(engine, this); - } + configureConnection(); } ConnectionContext::~ConnectionContext() { + if (ticker) ticker->cancel(); close(); sessions.clear(); pn_transport_free(engine); @@ -218,6 +232,10 @@ void ConnectionContext::close() lock.wait(); } } + if (ticker) { + ticker->cancel(); + ticker.reset(); + } } bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -498,7 +516,7 @@ uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> rece void ConnectionContext::activateOutput() { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - wakeupDriver(); + if (state == CONNECTED) wakeupDriver(); } /** * Expects lock to be held by caller @@ -530,14 +548,11 @@ void ConnectionContext::reset() engine = pn_transport(); connection = pn_connection(); - pn_connection_set_container(connection, identifier.c_str()); - bool enableTrace(false); - QPID_LOG_TEST_CAT(trace, protocol, enableTrace); - if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); + configureConnection(); + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { i->second->reset(connection); } - pn_transport_bind(engine, connection); } void ConnectionContext::check() { @@ -758,16 +773,31 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) //TODO: Fix pn_engine_input() to take const buffer ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); if (n > 0 || n == PN_EOS) { - //If engine returns EOS, have no way of knowing how many bytes - //it processed, but can assume none need to be reprocessed so - //consider them all read: - if (n == PN_EOS) n = size; + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + return 0; + } else { + n = size; // assume all consumed + } + } QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) - pn_transport_tick(engine, 0); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); lock.notifyAll(); return n; } else if (n == PN_ERR) { - throw MessagingException(QPID_MSG("Error on input: " << getError())); + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; } else { return 0; } @@ -792,10 +822,20 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) if (notifyOnWrite) lock.notifyAll(); return n; } else if (n == PN_ERR) { - throw MessagingException(QPID_MSG("Error on output: " << getError())); + std::string error; + checkTransportError(error); + QPID_LOG_CAT(error, network, id << " connection error: " << error); + transport->abort(); + return 0; } else if (n == PN_EOS) { haveOutput = false; - return 0;//Is this right? + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + } + return 0; } else { haveOutput = false; return 0; @@ -804,6 +844,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) bool ConnectionContext::canEncodePlain() { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); return haveOutput && state == CONNECTED; } void ConnectionContext::closed() @@ -1061,7 +1102,6 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { } QPID_LOG(debug, id << " Opening..."); - setProperties(); pn_connection_open(connection); wakeupDriver(); //want to write while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && @@ -1071,6 +1111,25 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { throw qpid::messaging::ConnectionError("Failed to open connection"); } + + // Connection open - check for idle timeout from the remote and start a + // periodic tick to monitor for idle connections + pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine); + pn_timestamp_t local = pn_transport_get_idle_timeout(engine); + uint64_t shortest = ((remote && local) + ? std::min(remote, local) + : (remote) ? remote : local); + if (shortest) { + // send an idle frame at least twice before timeout + shortest = (shortest + 1)/2; + qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this)); + driver->getTimer().add(ticker); + QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(engine) + << " remote=" << pn_transport_get_remote_idle_timeout(engine)); + } + QPID_LOG(debug, id << " Opened"); return restartSessions(); @@ -1151,4 +1210,44 @@ bool ConnectionContext::CodecAdapter::canEncode() } +// setup the transport and connection objects: +void ConnectionContext::configureConnection() +{ + pn_connection_set_container(connection, identifier.c_str()); + setProperties(); + if (heartbeat) { + // fail an idle connection at 2 x heartbeat (in msecs) + pn_transport_set_idle_timeout(engine, heartbeat*2*1000); + } + + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) { + pn_transport_trace(engine, PN_TRACE_FRM); + set_tracer(engine, this); + } + + int err = pn_transport_bind(engine, connection); + if (err) + QPID_LOG(error, id << " Error binding connection and transport: " << err); +} + + +// check for failures of the transport: +bool ConnectionContext::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(engine); + if (pn_condition_is_set(tcondition)) + info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition); +#else + pn_error_t* terror = pn_transport_error(engine); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 12f3c21e0a..80da9dff10 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -25,6 +25,7 @@ #include <map> #include <memory> #include <string> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/messaging/ConnectionOptions.h" @@ -47,6 +48,7 @@ class ProtocolVersion; namespace sys { class SecurityLayer; struct SecuritySettings; +class TimerTask; } namespace messaging { class Duration; @@ -120,7 +122,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void initSecurityLayer(qpid::sys::SecurityLayer&); void trace(const char*) const; - private: + private: typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; class CodecAdapter : public qpid::sys::Codec { @@ -155,6 +157,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::auto_ptr<Sasl> sasl; CodecAdapter codecAdapter; bool notifyOnWrite; + boost::intrusive_ptr<qpid::sys::TimerTask> ticker; void check(); bool checkDisconnected(); @@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::string getError(); bool useSasl(); void setProperties(); + void configureConnection(); + bool checkTransportError(std::string&); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp index 16307b3c22..ebe3fff1cb 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp @@ -22,13 +22,14 @@ #include "Transport.h" #include "qpid/messaging/exceptions.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace messaging { namespace amqp { -DriverImpl::DriverImpl() : poller(new qpid::sys::Poller) +DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new qpid::sys::Timer) { start(); } @@ -48,6 +49,7 @@ void DriverImpl::stop() QPID_LOG(debug, "Driver stopped"); poller->shutdown(); thread.join(); + timer->stop(); } boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection) diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h index 354fa1ae35..36cb196343 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h @@ -29,6 +29,7 @@ namespace qpid { namespace sys { class Poller; +class Timer; } namespace messaging { namespace amqp { @@ -47,11 +48,14 @@ class DriverImpl void stop(); boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection); + sys::Timer& getTimer() { return *timer; } static boost::shared_ptr<DriverImpl> getDefault(); private: boost::shared_ptr<qpid::sys::Poller> poller; qpid::sys::Thread thread; + std::auto_ptr<sys::Timer> timer; + static qpid::sys::Mutex defaultLock; static boost::weak_ptr<DriverImpl> theDefault; }; diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.h b/qpid/cpp/src/qpid/messaging/amqp/Transport.h index 159916f9ae..6ec99ab58f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/Transport.h +++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.h @@ -40,6 +40,7 @@ class Transport : public qpid::sys::OutputControl virtual ~Transport() {} virtual void connect(const std::string& host, const std::string& port) = 0; virtual void close() = 0; + virtual void abort() = 0; virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0; typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); diff --git a/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp index ad47fd98d9..5dbc13175f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp @@ -94,6 +94,9 @@ void SslTransport::negotiationDone(SECURITY_STATUS status) SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : TcpTransport(c, p) { const ConnectionOptions* options = context.getOptions(); + if (options->sslIgnoreHostnameVerificationFailure) { + sslCredential.ignoreHostnameVerificationFailure(); + } const std::string& name = (options->sslCertName != "") ? options->sslCertName : qpid::sys::ssl::SslOptions::global.certName; certLoaded = sslCredential.load(name); diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h index 1fcde6683d..5290fc16db 100644 --- a/qpid/cpp/src/qpid/store/MessageStorePlugin.h +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h @@ -267,7 +267,7 @@ class MessageStorePlugin : }; StoreOptions options; - typedef std::map<const std::string, StorageProvider*> ProviderMap; + typedef std::map<std::string, StorageProvider*> ProviderMap; ProviderMap providers; ProviderMap::const_iterator provider; diff --git a/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp b/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp index 4e4a1289d1..076056b2af 100644 --- a/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp +++ b/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp @@ -40,5 +40,5 @@ void uuid_generate (uint8_t out[qpid::sys::UuidSize]) out[7] = (uuid.time_hi_and_version & 0x00ff); out[8] = uuid.clock_seq_hi_and_reserved; out[9] = uuid.clock_seq_low; - ::memcpy(&out[10], &uuid.node, _UUID_NODE_LEN); + ::memcpy(&out[10], &uuid.node, sizeof(uuid.node)); } diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h index f556612908..cb0a1adce6 100644 --- a/qpid/cpp/src/qpid/sys/Thread.h +++ b/qpid/cpp/src/qpid/sys/Thread.h @@ -34,6 +34,8 @@ # define QPID_TSS __thread #elif defined (__SUNPRO_CC) # define QPID_TSS __thread +#elif defined (__IBMCPP__) +# define QPID_TSS __thread #else # error "Dont know how to define QPID_TSS for this platform" #endif diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index eebf02e867..7d04d2214d 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -93,7 +93,7 @@ private: AsynchAcceptor::AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback) : acceptedCallback(callback), - handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), + handle((const IOHandle&)s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0), socket(s) { s.setNonblocking(); @@ -167,7 +167,7 @@ AsynchConnector::AsynchConnector(const Socket& s, const std::string& port, ConnectedCallback connCb, FailedCallback failCb) : - DispatchHandle(s, + DispatchHandle((const IOHandle&)s, 0, boost::bind(&AsynchConnector::connComplete, this, _1), boost::bind(&AsynchConnector::connComplete, this, _1)), @@ -308,7 +308,7 @@ AsynchIO::AsynchIO(const Socket& s, ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb, ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) : - DispatchHandle(s, + DispatchHandle((const IOHandle&)s, boost::bind(&AsynchIO::readable, this, _1), boost::bind(&AsynchIO::writeable, this, _1), boost::bind(&AsynchIO::disconnected, this, _1)), diff --git a/qpid/cpp/src/qpid/sys/posix/Thread.cpp b/qpid/cpp/src/qpid/sys/posix/Thread.cpp index 2075e1dfbc..349e35d643 100644 --- a/qpid/cpp/src/qpid/sys/posix/Thread.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Thread.cpp @@ -59,7 +59,7 @@ Thread::operator bool() { } bool Thread::operator==(const Thread& t) const { - return ::pthread_equal(impl->thread, t.impl->thread) != 0; + return pthread_equal(impl->thread, t.impl->thread) != 0; } bool Thread::operator!=(const Thread& t) const { diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp index c44640b3e0..10a5d944b1 100644 --- a/qpid/cpp/src/qpid/sys/posix/Time.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp @@ -24,6 +24,7 @@ #include "qpid/sys/Time.h" #include <ostream> #include <istream> +#include <sstream> #include <time.h> #include <stdio.h> #include <sys/time.h> @@ -106,13 +107,14 @@ std::istream& operator>>(std::istream& i, Duration& d) { if (i.eof() || std::isspace(i.peek())) // No suffix d = int64_t(number*TIME_SEC); else { - std::string suffix; - i >> suffix; + std::stringbuf suffix; + i >> &suffix; if (i.fail()) return i; - if (suffix.compare("s") == 0) d = int64_t(number*TIME_SEC); - else if (suffix.compare("ms") == 0) d = int64_t(number*TIME_MSEC); - else if (suffix.compare("us") == 0) d = int64_t(number*TIME_USEC); - else if (suffix.compare("ns") == 0) d = int64_t(number*TIME_NSEC); + std::string suffix_str = suffix.str(); + if (suffix_str.compare("s") == 0) d = int64_t(number*TIME_SEC); + else if (suffix_str.compare("ms") == 0) d = int64_t(number*TIME_MSEC); + else if (suffix_str.compare("us") == 0) d = int64_t(number*TIME_USEC); + else if (suffix_str.compare("ns") == 0) d = int64_t(number*TIME_NSEC); else i.setstate(std::ios::failbit); } return i; diff --git a/qpid/cpp/src/qpid/sys/unordered_map.h b/qpid/cpp/src/qpid/sys/unordered_map.h index 22cedad299..1b27770804 100644 --- a/qpid/cpp/src/qpid/sys/unordered_map.h +++ b/qpid/cpp/src/qpid/sys/unordered_map.h @@ -23,7 +23,7 @@ #if defined(_MSC_VER) || defined(_LIBCPP_VERSION) || __cplusplus >= 201103L # include <unordered_map> -#elif defined(__SUNPRO_CC) +#elif defined(__SUNPRO_CC) || defined(__IBMCPP__) # include <boost/tr1/unordered_map.hpp> #else # include <tr1/unordered_map> diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp index 667f0f1ef0..de8f10b0e9 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp @@ -34,7 +34,7 @@ namespace sys { namespace windows { -SslCredential::SslCredential() : certStore(0), cert(0) +SslCredential::SslCredential() : certStore(0), cert(0), hostnameVerification(true) { SecInvalidateHandle(&credHandle); memset(&cred, 0, sizeof(cred)); @@ -60,6 +60,8 @@ bool SslCredential::load(const std::string& certName) cred.paCred = &cert; cred.cCreds = 1; } + if (!hostnameVerification) + cred.dwFlags |= SCH_CRED_NO_SERVERNAME_CHECK; SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, UNISP_NAME, @@ -89,6 +91,10 @@ std::string SslCredential::error() return loadError.error; } +void SslCredential::ignoreHostnameVerificationFailure(){ + hostnameVerification = false; +} + void SslCredential::loadPrivCertStore() { // Get a handle to the system store or pkcs#12 file diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.h b/qpid/cpp/src/qpid/sys/windows/SslCredential.h index ba16dcdab5..25d174a2fa 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslCredential.h +++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.h @@ -53,6 +53,8 @@ public: QPID_COMMON_EXTERN bool load(const std::string& certName); QPID_COMMON_EXTERN CredHandle handle(); QPID_COMMON_EXTERN std::string error(); + /** Proceed with connect inspite of hostname verifcation failures*/ + QPID_COMMON_EXTERN void ignoreHostnameVerificationFailure(); private: struct SavedError { @@ -70,6 +72,7 @@ private: CredHandle credHandle; TimeStamp credExpiry; SavedError loadError; + bool hostnameVerification; PCCERT_CONTEXT findCertificate(const std::string& name); void loadPrivCertStore(); diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp index b342c9da1d..8034680664 100755 --- a/qpid/cpp/src/qpid/sys/windows/Thread.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp @@ -224,7 +224,7 @@ Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable)) Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {} Thread::operator bool() { - return impl; + return !!impl; } bool Thread::operator==(const Thread& t) const { diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp index f9820a82dd..6b979a016b 100644 --- a/qpid/cpp/src/qpid/types/Variant.cpp +++ b/qpid/cpp/src/qpid/types/Variant.cpp @@ -188,13 +188,13 @@ bool caseInsensitiveMatch(const std::string& a, const std::string& b) return a.size() == b.size() && std::equal(a.begin(), a.end(), b.begin(), &same_char); } -const std::string TRUE("True"); -const std::string FALSE("False"); +const std::string TRUE_STRING("True"); +const std::string FALSE_STRING("False"); bool toBool(const std::string& s) { - if (caseInsensitiveMatch(s, TRUE)) return true; - if (caseInsensitiveMatch(s, FALSE)) return false; + if (caseInsensitiveMatch(s, TRUE_STRING)) return true; + if (caseInsensitiveMatch(s, FALSE_STRING)) return false; try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {} throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool")); } @@ -494,7 +494,7 @@ std::string VariantImpl::asString() const { switch(type) { case VAR_VOID: return EMPTY; - case VAR_BOOL: return value.b ? TRUE : FALSE; + case VAR_BOOL: return value.b ? TRUE_STRING : FALSE_STRING; case VAR_UINT8: return boost::lexical_cast<std::string>((int) value.ui8); case VAR_UINT16: return boost::lexical_cast<std::string>(value.ui16); case VAR_UINT32: return boost::lexical_cast<std::string>(value.ui32); diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 08a8c69d69..c914c50e33 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -364,6 +364,7 @@ add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py) if (BUILD_AMQP) add_test (interlink_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py) + add_test (idle_timeout_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/idle_timeout_tests.py) endif (BUILD_AMQP) add_test (swig_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix}) add_test (ipv6_test ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix}) diff --git a/qpid/cpp/src/tests/TimerTest.cpp b/qpid/cpp/src/tests/TimerTest.cpp index e9ca3fcdf6..d28eeeffc1 100644 --- a/qpid/cpp/src/tests/TimerTest.cpp +++ b/qpid/cpp/src/tests/TimerTest.cpp @@ -82,7 +82,7 @@ class TestTask : public TimerTask uint64_t difference = _abs64(expected - actual); #elif defined(_WIN32) uint64_t difference = labs(expected - actual); -#elif defined(__SUNPRO_CC) +#elif defined(__SUNPRO_CC) || defined (__IBMCPP__) uint64_t difference = llabs(expected - actual); #else uint64_t difference = abs(expected - actual); diff --git a/qpid/cpp/src/tests/idle_timeout_tests.py b/qpid/cpp/src/tests/idle_timeout_tests.py new file mode 100755 index 0000000000..c3cc00746b --- /dev/null +++ b/qpid/cpp/src/tests/idle_timeout_tests.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import shutil +import signal +import sys + +from brokertest import * + +class AmqpIdleTimeoutTest(BrokerTest): + """ + Test AMQP 1.0 idle-timeout support + """ + def setUp(self): + BrokerTest.setUp(self) + if not BrokerTest.amqp_lib: + raise Skipped("AMQP 1.0 library not found") + if qm != qpid_messaging: + raise Skipped("AMQP 1.0 client not found") + self._broker = self.broker() + + def test_client_timeout(self): + """Ensure that the client disconnects should the broker stop + responding. + """ + conn = self._broker.connect(native=False, timeout=None, + protocol="amqp1.0", heartbeat=1) + self.assertTrue(conn.isOpen()) + # should disconnect within 2 seconds of broker stop + deadline = time.time() + 8 + os.kill(self._broker.pid, signal.SIGSTOP) + while time.time() < deadline: + if not conn.isOpen(): + break; + self.assertTrue(not conn.isOpen()) + os.kill(self._broker.pid, signal.SIGCONT) + + + def test_broker_timeout(self): + """By default, the broker will adopt the same timeout as the client + (mimics the 0-10 timeout behavior). Verify the broker disconnects + unresponsive clients. + """ + + count = len(self._broker.agent.getAllConnections()) + + # Create a new connection to the broker: + receiver_cmd = ["qpid-receive", + "--broker", self._broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0, heartbeat:1}", + "--forever"] + receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE, + expect=EXPECT_UNKNOWN) + start = time.time() + deadline = time.time() + 10 + while time.time() < deadline: + if count < len(self._broker.agent.getAllConnections()): + break; + self.assertTrue(count < len(self._broker.agent.getAllConnections())) + + # now 'hang' the client, the broker should disconnect + start = time.time() + receiver.send_signal(signal.SIGSTOP) + deadline = time.time() + 10 + while time.time() < deadline: + if count == len(self._broker.agent.getAllConnections()): + break; + self.assertEqual(count, len(self._broker.agent.getAllConnections())) + receiver.send_signal(signal.SIGCONT) + receiver.teardown() + + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:]) |
