summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 16:15:08 +0000
commit085486ebe5ff21133b9caf1c31625ac6ea356568 (patch)
tree7acbe9ca99a345dca71f9f80cd3e29ea4e3710f0 /qpid/cpp
parent60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (diff)
parente2e6d542b8cde9e702d1c3b63376e9d8380ba1c7 (diff)
downloadqpid-python-085486ebe5ff21133b9caf1c31625ac6ea356568.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/CMakeLists.txt5
-rw-r--r--qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake59
-rw-r--r--qpid/cpp/INSTALL19
-rw-r--r--qpid/cpp/src/CMakeLists.txt12
-rw-r--r--qpid/cpp/src/config.h.cmake1
-rw-r--r--qpid/cpp/src/qpid/InlineAllocator.h2
-rw-r--r--qpid/cpp/src/qpid/NullSaslServer.cpp1
-rw-r--r--qpid/cpp/src/qpid/Options.cpp4
-rw-r--r--qpid/cpp/src/qpid/RangeSet.h6
-rw-r--r--qpid/cpp/src/qpid/RefCounted.h2
-rw-r--r--qpid/cpp/src/qpid/SessionId.h2
-rw-r--r--qpid/cpp/src/qpid/amqp/MapEncoder.h2
-rw-r--r--qpid/cpp/src/qpid/broker/LossyLvq.h11
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--qpid/cpp/src/qpid/broker/TopicKeyNode.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp82
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Filter.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp23
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h3
-rw-r--r--qpid/cpp/src/qpid/client/windows/SslConnector.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp149
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transport.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp3
-rw-r--r--qpid/cpp/src/qpid/store/MessageStorePlugin.h2
-rw-r--r--qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/Thread.h2
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp6
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Thread.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Time.cpp14
-rw-r--r--qpid/cpp/src/qpid/sys/unordered_map.h2
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslCredential.cpp8
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslCredential.h3
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Thread.cpp2
-rw-r--r--qpid/cpp/src/qpid/types/Variant.cpp10
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/tests/TimerTest.cpp2
-rwxr-xr-xqpid/cpp/src/tests/idle_timeout_tests.py95
42 files changed, 488 insertions, 84 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt
index 29a52cbb49..aaa4203c1e 100644
--- a/qpid/cpp/CMakeLists.txt
+++ b/qpid/cpp/CMakeLists.txt
@@ -182,6 +182,11 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro)
set (HIDE_SYMBOL_FLAGS "")
endif (CMAKE_CXX_COMPILER_ID STREQUAL SunPro)
+# XL is IBM XL C/C++
+if (CMAKE_CXX_COMPILER_ID MATCHES XL)
+ set (COMPILER_FLAGS "-qtls -qrtti")
+endif (CMAKE_CXX_COMPILER_ID MATCHES XL)
+
if (CMAKE_SYSTEM_NAME STREQUAL Windows)
# Allow MSVC user to select 'WinXP-SP3/Windows Server 2003' as build target version
set (win32_winnt_default OFF)
diff --git a/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake b/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake
new file mode 100644
index 0000000000..fb515cd149
--- /dev/null
+++ b/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake
@@ -0,0 +1,59 @@
+#
+# $Id $
+#
+# Author(s): Anton Deguet
+# Created on: 2011
+#
+# (C) Copyright 2011 Johns Hopkins University (JHU), All Rights
+# Reserved.
+#
+# --- begin cisst license - do not edit ---
+#
+# This software is provided "as is" under an open source license, with
+# no warranty. The complete license can be found in license.txt and
+# http://www.cisst.org/cisst/license.txt.
+#
+# --- end cisst license ---
+
+function (check_size_t_native_type VARIABLE)
+ # make sure we don't test over and over
+ if ("${VARIABLE}" MATCHES "^${VARIABLE}$")
+ message (STATUS "Checking to see if size_t is a native type")
+ set (SOURCE
+ "#include <vector>
+ char method(unsigned int p) {
+ return 'u';
+ }
+ char method(unsigned long long int p) {
+ return 'l';
+ }
+ char method(size_t p) {
+ return 's';
+ }
+ int main(void) {}")
+
+ file (WRITE
+ "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp"
+ "${SOURCE}\n")
+
+ try_compile (${VARIABLE}
+ ${CMAKE_BINARY_DIR}
+ "${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeTmp/test_size_t.cpp"
+ OUTPUT_VARIABLE OUTPUT)
+
+ # report using message and log files
+ if (${VARIABLE})
+ message (STATUS "Checking to see if size_t is a native type - yes")
+ file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeOutput.log
+ "Determining if size_t is a native type passed with "
+ "the following output:\n${OUTPUT}\n\n")
+ else (${VARIABLE})
+ message (STATUS "Checking to see if size_t is a native type - no")
+ file (APPEND ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/CMakeError.log
+ "Determining if size_t is a native type passed with "
+ "the following output:\n${OUTPUT}\n\n")
+ endif (${VARIABLE})
+
+ endif ("${VARIABLE}" MATCHES "^${VARIABLE}$")
+
+endfunction (check_size_t_native_type VARIABLE)
diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL
index 0e91cddec8..717c9b0908 100644
--- a/qpid/cpp/INSTALL
+++ b/qpid/cpp/INSTALL
@@ -91,7 +91,7 @@ Or if you have only have a command line environment available
2.1 Building as C++11 (Experimental)
====================================
-Currently the Qpoid project uses C++ that conforms to the C++03 standard, as currently
+Currently the Qpid project uses C++ that conforms to the C++03 standard, as currently
this is the C++ standard that is supported the most widely, so any new code must also
compile as C++03. As an experiment (and to support a few extra platforms) the Qpid code
will also now build as C++11.
@@ -141,6 +141,23 @@ If you want to use the ports version of cyrus-sasl then you should also add:
Which will allow cmake to find libraries installed in /usr/local (which is where cyrus-sasl gets
installed by ports).
+2.4 Building on AIX
+===================
+Qpid has been tested on AIX 7.1 with XL C++ 13.1 and Boost 1.55.0. The
+thread-using variant of the compiler must be used but it isn't the default
+picked up by cmake. Thus, the compiler must be specified at cmake time.
+For example (assuming PATH includes the compiler binaries):
+
+ # CXX=xlC_r CC=cc_r cmake ..
+
+Warnings from Boost header files are expected and can be ignored.
+
+It is normal to see (lots of) multiply-defined symbol warnings when linking
+the shared libraries built as part of Qpid.
+
+The mktemp package must be installed separately in order to execute the
+Qpid test suite.
+
3. Building a Repository Working Copy
=====================================
To get the source code from the subversion repository (trunk) do:
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 3e5165dfb0..8263534614 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -42,6 +42,7 @@ include(CheckIncludeFiles)
include(CheckIncludeFileCXX)
include(CheckLibraryExists)
include(CheckSymbolExists)
+include(CheckSizeTNativeType)
find_package(PkgConfig)
find_package(Ruby)
@@ -350,6 +351,8 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
mark_as_advanced(QPID_POLLER)
endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+check_size_t_native_type (QPID_SIZE_T_NATIVE)
+
option(BUILD_SASL "Build with Cyrus SASL support" ${SASL_FOUND})
if (BUILD_SASL)
if (NOT SASL_FOUND)
@@ -684,6 +687,12 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
)
endif (CMAKE_SYSTEM_NAME STREQUAL SunOS)
+ if (CMAKE_SYSTEM_NAME STREQUAL AIX)
+ set (qpid_system_module
+ qpid/sys/aix/SystemInfo.cpp
+ )
+ endif (CMAKE_SYSTEM_NAME STREQUAL AIX)
+
if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro)
# -lmalloc needed for mallinfo.
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lmalloc")
@@ -1145,6 +1154,9 @@ set_target_properties (qpidbroker PROPERTIES
VERSION ${qpidbroker_version}
SOVERSION ${qpidbroker_version_major}
COMPILE_DEFINITIONS _IN_QPID_BROKER)
+if (CMAKE_CXX_COMPILER_ID MATCHES XL)
+ set_target_properties (qpidbroker PROPERTIES LINK_FLAGS -Wl,-bbigtoc)
+endif (CMAKE_CXX_COMPILER_ID MATCHES XL)
if (MSVC)
set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake
index 777fc1b893..478b369eb6 100644
--- a/qpid/cpp/src/config.h.cmake
+++ b/qpid/cpp/src/config.h.cmake
@@ -56,6 +56,7 @@
#cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H}
#cmakedefine HAVE_LOG_AUTHPRIV
#cmakedefine HAVE_LOG_FTP
+#cmakedefine QPID_SIZE_T_NATIVE
#cmakedefine HAVE_PROTON_TRACER
#cmakedefine USE_PROTON_TRANSPORT_CONDITION
#cmakedefine HAVE_PROTON_EVENTS
diff --git a/qpid/cpp/src/qpid/InlineAllocator.h b/qpid/cpp/src/qpid/InlineAllocator.h
index 2502545dcb..28ea73ec12 100644
--- a/qpid/cpp/src/qpid/InlineAllocator.h
+++ b/qpid/cpp/src/qpid/InlineAllocator.h
@@ -47,7 +47,7 @@ class InlineAllocator : public BaseAllocator {
InlineAllocator() : allocated(false) {}
InlineAllocator(const InlineAllocator& x) : BaseAllocator(x), allocated(false) {}
- pointer allocate(size_type n) {
+ pointer allocate(size_type n, std::allocator<void>::const_pointer = 0) {
if (n <= Max && !allocated) {
allocated=true;
return reinterpret_cast<value_type*>(address());
diff --git a/qpid/cpp/src/qpid/NullSaslServer.cpp b/qpid/cpp/src/qpid/NullSaslServer.cpp
index 40bd9ebbc6..9d560c8e68 100644
--- a/qpid/cpp/src/qpid/NullSaslServer.cpp
+++ b/qpid/cpp/src/qpid/NullSaslServer.cpp
@@ -66,7 +66,6 @@ NullSaslServer::Status NullSaslServer::start(const std::string& mechanism, const
NullSaslServer::Status NullSaslServer::step(const std::string* /*response*/, std::string& /*challenge*/)
{
- assert(false);
return FAIL;
}
std::string NullSaslServer::getMechanisms()
diff --git a/qpid/cpp/src/qpid/Options.cpp b/qpid/cpp/src/qpid/Options.cpp
index cdba53449a..5ca91e6bd4 100644
--- a/qpid/cpp/src/qpid/Options.cpp
+++ b/qpid/cpp/src/qpid/Options.cpp
@@ -16,6 +16,7 @@
*
*/
+#include "config.h"
#include "qpid/Options.h"
#include "qpid/OptionsTemplates.h"
#include "qpid/Exception.h"
@@ -145,6 +146,9 @@ template QPID_COMMON_EXTERN po::value_semantic* create_value(int64_t& val, const
template QPID_COMMON_EXTERN po::value_semantic* create_value(uint16_t& val, const std::string& arg);
template QPID_COMMON_EXTERN po::value_semantic* create_value(uint32_t& val, const std::string& arg);
template QPID_COMMON_EXTERN po::value_semantic* create_value(uint64_t& val, const std::string& arg);
+#ifdef QPID_SIZE_T_NATIVE
+template QPID_COMMON_EXTERN po::value_semantic* create_value(size_t& val, const std::string& arg);
+#endif
template QPID_COMMON_EXTERN po::value_semantic* create_value(double& val, const std::string& arg);
template QPID_COMMON_EXTERN po::value_semantic* create_value(string& val, const std::string& arg);
diff --git a/qpid/cpp/src/qpid/RangeSet.h b/qpid/cpp/src/qpid/RangeSet.h
index 78677f1263..20ee722fcb 100644
--- a/qpid/cpp/src/qpid/RangeSet.h
+++ b/qpid/cpp/src/qpid/RangeSet.h
@@ -96,9 +96,9 @@ class Range {
*/
template <class T>
class RangeSet
- : boost::additive1<RangeSet<T>,
- boost::additive2<RangeSet<T>, Range<T>,
- boost::additive2<RangeSet<T>, T> > >
+ : private boost::additive1<RangeSet<T>,
+ boost::additive2<RangeSet<T>, Range<T>,
+ boost::additive2<RangeSet<T>, T> > >
{
typedef InlineVector<Range<T>, 3> Ranges; // TODO aconway 2008-04-21: what's the optimial inlined value?
diff --git a/qpid/cpp/src/qpid/RefCounted.h b/qpid/cpp/src/qpid/RefCounted.h
index 26e3e2c4ba..c2ec367658 100644
--- a/qpid/cpp/src/qpid/RefCounted.h
+++ b/qpid/cpp/src/qpid/RefCounted.h
@@ -33,7 +33,7 @@ namespace qpid {
* to the class that has mixed this in not the class itself (as that would sidestep
* the reference counting)
*/
-class RefCounted : boost::noncopyable {
+class RefCounted : private boost::noncopyable {
mutable boost::detail::atomic_count count;
public:
diff --git a/qpid/cpp/src/qpid/SessionId.h b/qpid/cpp/src/qpid/SessionId.h
index e18b360999..d950ad9d1a 100644
--- a/qpid/cpp/src/qpid/SessionId.h
+++ b/qpid/cpp/src/qpid/SessionId.h
@@ -39,7 +39,7 @@ namespace qpid {
* The name must be unique among sessions with the same authentication
* principal.
*/
-class SessionId : boost::totally_ordered1<SessionId> {
+class SessionId : private boost::totally_ordered1<SessionId> {
std::string userId;
std::string name;
public:
diff --git a/qpid/cpp/src/qpid/amqp/MapEncoder.h b/qpid/cpp/src/qpid/amqp/MapEncoder.h
index 42fb819932..1481f9125a 100644
--- a/qpid/cpp/src/qpid/amqp/MapEncoder.h
+++ b/qpid/cpp/src/qpid/amqp/MapEncoder.h
@@ -31,7 +31,7 @@ struct Descriptor;
/**
* Encode map like data
*/
-class MapEncoder : public MapHandler, Encoder
+class MapEncoder : public MapHandler, private Encoder
{
public:
MapEncoder(char* data, size_t size);
diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.h b/qpid/cpp/src/qpid/broker/LossyLvq.h
index e0a266ab77..2665ebe49f 100644
--- a/qpid/cpp/src/qpid/broker/LossyLvq.h
+++ b/qpid/cpp/src/qpid/broker/LossyLvq.h
@@ -28,6 +28,12 @@ namespace qpid {
namespace broker {
class MessageMap;
+// Disable inherited-by-dominance warning on MSVC. We know. It's ok.
+#ifdef _MSC_VER
+# pragma warning(push)
+# pragma warning(disable : 4250)
+#endif
+
/**
* Combination of LossyQueue and Lvq behaviours.
*/
@@ -36,6 +42,11 @@ class LossyLvq : public Lvq, public LossyQueue
public:
LossyLvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
};
+
+#ifdef _MSC_VER
+# pragma warning(pop)
+#endif
+
}} // namespace qpid::broker
#endif /*!QPID_BROKER_LOSSYLVQ_H*/
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 0eede36f3f..af4e8e50fb 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -44,7 +44,7 @@ class OwnershipToken;
* are deleted when and only when they are no longer in use.
*
*/
-class QueueRegistry : QueueFactory {
+class QueueRegistry : private QueueFactory {
public:
QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0);
QPID_BROKER_EXTERN ~QueueRegistry();
diff --git a/qpid/cpp/src/qpid/broker/TopicKeyNode.h b/qpid/cpp/src/qpid/broker/TopicKeyNode.h
index 7671ed069d..ac760b198e 100644
--- a/qpid/cpp/src/qpid/broker/TopicKeyNode.h
+++ b/qpid/cpp/src/qpid/broker/TopicKeyNode.h
@@ -166,7 +166,7 @@ class QPID_BROKER_CLASS_EXTERN TopicKeyNode {
bool isHash;
// children
- typedef std::map<const std::string, typename TopicKeyNode::shared_ptr> ChildMap;
+ typedef std::map<std::string, typename TopicKeyNode::shared_ptr> ChildMap;
ChildMap childTokens;
typename TopicKeyNode::shared_ptr starChild; // "*" subtree
typename TopicKeyNode::shared_ptr hashChild; // "#" subtree
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 3a93e2aac5..8d6516edee 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -121,7 +121,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
connection(pn_connection()),
transport(pn_transport()),
collector(0),
- out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
+ out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false), ioRequested(false)
{
#ifdef HAVE_PROTON_EVENTS
collector = pn_collector();
@@ -157,6 +157,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
void Connection::requestIO()
{
+ ioRequested = true;
out.activateOutput();
}
@@ -179,13 +180,24 @@ size_t Connection::decode(const char* buffer, size_t size)
{
QPID_LOG(trace, id << " decode(" << size << ")");
if (size == 0) return 0;
- //TODO: Fix pn_engine_input() to take const buffer
+
ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
- //If engine returns EOS, have no way of knowing how many bytes
- //it processed, but can assume none need to be reprocessed so
- //consider them all read:
- if (n == PN_EOS) n = size;
+ // PN_EOS either means we received a Close (which also means we've
+ // consumed all the input), OR some Very Bad Thing happened and this
+ // connection is toast.
+ if (n == PN_EOS)
+ {
+ std::string error;
+ if (checkTransportError(error)) {
+ // "He's dead, Jim."
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ out.abort();
+ return 0;
+ } else {
+ n = size; // assume all consumed
+ }
+ }
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
try {
process();
@@ -209,7 +221,11 @@ size_t Connection::decode(const char* buffer, size_t size)
}
return n;
} else if (n == PN_ERR) {
- throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError()));
+ std::string error;
+ checkTransportError(error);
+ QPID_LOG_CAT(error, network, id << " connection error: " << error);
+ out.abort();
+ return 0;
} else {
return 0;
}
@@ -224,8 +240,21 @@ size_t Connection::encode(char* buffer, size_t size)
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
return n;
+ } else if (n == PN_EOS) {
+ haveOutput = false;
+ // Normal close, or error?
+ std::string error;
+ if (checkTransportError(error)) {
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ out.abort();
+ }
+ return 0;
} else if (n == PN_ERR) {
- throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
+ std::string error;
+ checkTransportError(error);
+ QPID_LOG_CAT(error, network, id << " connection error: " << error);
+ out.abort();
+ return 0;
} else {
haveOutput = false;
return 0;
@@ -291,6 +320,7 @@ bool Connection::canEncode()
} else {
QPID_LOG(info, "Connection " << id << " has been closed locally");
}
+ if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true;
pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
@@ -303,8 +333,22 @@ void Connection::open()
pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
if (timeout) {
- ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this));
- pn_transport_set_idle_timeout(transport, timeout);
+ // if idle generate empty frames at 1/2 the timeout interval as keepalives:
+ ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask((timeout+1)/2,
+ getBroker().getTimer(),
+ *this));
+ getBroker().getTimer().add(ticker);
+
+ // Note: in version 0-10 of the protocol, idle timeout applies to both
+ // ends. AMQP 1.0 changes that - it's now asymmetric: each end can
+ // configure/disable it independently. For backward compatibility, by
+ // default mimic the old behavior and set our local timeout.
+ // Use 2x the remote's timeout, as per the spec the remote should
+ // advertise 1/2 its actual timeout threshold
+ pn_transport_set_idle_timeout(transport, timeout * 2);
+ QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:"
+ << " local=" << pn_transport_get_idle_timeout(transport)
+ << " remote=" << pn_transport_get_remote_idle_timeout(transport));
}
pn_connection_open(connection);
@@ -585,4 +629,22 @@ void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
}
}
+// check for failures of the transport:
+bool Connection::checkTransportError(std::string& text)
+{
+ std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+ pn_condition_t* tcondition = pn_transport_condition(transport);
+ if (pn_condition_is_set(tcondition))
+ info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
+#else
+ pn_error_t* terror = pn_transport_error(transport);
+ if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]";
+#endif
+
+ text = info.str();
+ return !text.empty();
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index ea4ce06163..e97d041c03 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -24,6 +24,7 @@
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/broker/amqp/BrokerContext.h"
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/sys/AtomicValue.h"
#include <map>
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -80,6 +81,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
bool closeInitiated;
bool closeRequested;
boost::intrusive_ptr<sys::TimerTask> ticker;
+ qpid::sys::AtomicValue<bool> ioRequested;
virtual void process();
void doOutput(size_t);
@@ -92,6 +94,8 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void closedByManagement();
private:
+ bool checkTransportError(std::string&);
+
// handle Proton engine events
void doConnectionRemoteOpen();
void doConnectionRemoteClose();
@@ -100,7 +104,6 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void doLinkRemoteOpen(pn_link_t *link);
void doLinkRemoteClose(pn_link_t *link);
void doDeliveryUpdated(pn_delivery_t *delivery);
-
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h
index e12e9e412b..c246fb9ede 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Filter.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h
@@ -37,7 +37,7 @@ struct QueueSettings;
namespace amqp {
class Outgoing;
-class Filter : qpid::amqp::MapReader
+class Filter : private qpid::amqp::MapReader
{
public:
Filter();
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 0225ee74cb..ed931c90fb 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -548,8 +548,8 @@ Subscription::Subscription(const Address& address, const std::string& type)
if ((Opt(address)/LINK).hasKey(TIMEOUT)) {
const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
- } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) {
- //if durable but not reliable, and auto-delete not
+ } else if (durable && !AddressResolution::is_reliable(address) && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) {
+ //if durable, not explicitly reliable, and auto-delete not
//explicitly set, then set a non-zero default for the
//autodelete timeout
queueOptions.setInt("qpid.auto_delete_timeout", 2*60);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index e300ddae8b..2ca2c85c64 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -164,8 +164,13 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
ScopedRelease release(inUse, lock);
sys::Mutex::ScopedUnlock l(lock);
//wait for suitable new message to arrive
- if (process(&handler, get_duration(timeout, deadline))) {
+ switch (process(&handler, get_duration(timeout, deadline))) {
+ case OK:
return true;
+ case CLOSED:
+ return false;
+ case EMPTY:
+ break;
}
}
if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
@@ -233,7 +238,7 @@ void IncomingMessages::releaseAll()
}
//then pump out any available messages from incoming queue...
GetAny handler;
- while (process(&handler, 0)) ;
+ while (process(&handler, 0) == OK) ;
//now release all messages
sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
@@ -242,7 +247,7 @@ void IncomingMessages::releaseAll()
void IncomingMessages::releasePending(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) ;
+ while (process(0, 0) == OK) ;
//now remove all messages for this destination from received list, recording their ids...
sys::Mutex::ScopedLock l(lock);
@@ -269,7 +274,7 @@ bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration ti
* that are not accepted by the handler are pushed onto received queue
* for later retrieval.
*/
-bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
@@ -282,7 +287,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
<< *content->getHeaders());
- return true;
+ return OK;
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
@@ -295,8 +300,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
}
}
}
- catch (const qpid::ClosedException&) {} // Just return false if queue closed.
- return false;
+ catch (const qpid::ClosedException&) { return CLOSED; }
+ return EMPTY;
}
bool IncomingMessages::wait(qpid::sys::Duration duration)
@@ -331,7 +336,7 @@ uint32_t IncomingMessages::pendingAccept(const std::string& destination)
uint32_t IncomingMessages::available()
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//return the count of received messages
sys::Mutex::ScopedLock l(lock);
return received.size();
@@ -340,7 +345,7 @@ uint32_t IncomingMessages::available()
uint32_t IncomingMessages::available(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//count all messages for this destination from received list
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index c9ea0673a3..4c9ee68ece 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -87,6 +87,7 @@ class IncomingMessages
uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
+ enum ProcessState {EMPTY=0,OK=1,CLOSED=2};
sys::Monitor lock;
qpid::client::AsyncSession session;
@@ -95,7 +96,7 @@ class IncomingMessages
FrameSetQueue received;
AcceptTracker acceptTracker;
- bool process(Handler*, qpid::sys::Duration);
+ ProcessState process(Handler*, qpid::sys::Duration);
bool wait(qpid::sys::Duration);
bool pop(FrameSetPtr&, qpid::sys::Duration);
diff --git a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
index d0be818df0..dc82ece9d1 100644
--- a/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
+++ b/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
@@ -115,7 +115,9 @@ SslConnector::SslConnector(boost::shared_ptr<qpid::sys::Poller> p,
ConnectionImpl* cimpl)
: TCPConnector(p, ver, settings, cimpl), shim(0), poller(p)
{
-
+ if (settings.sslIgnoreHostnameVerificationFailure) {
+ sslCredential.ignoreHostnameVerificationFailure();
+ }
const std::string& name = (settings.sslCertName != "") ?
settings.sslCertName : qpid::sys::ssl::SslOptions::global.certName;
certLoaded = sslCredential.load(name);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index fedab4286f..a0b16c2b4c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -39,6 +39,7 @@
#include "qpid/sys/SecurityLayer.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/urlAdd.h"
#include "config.h"
#include <boost/lexical_cast.hpp>
@@ -95,6 +96,27 @@ std::string get_error(pn_connection_t* connection, pn_transport_t* transport)
}
#endif
+class ConnectionTickerTask : public qpid::sys::TimerTask
+{
+ qpid::sys::Timer& timer;
+ ConnectionContext& connection;
+ public:
+ ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) :
+ TimerTask(interval, "ConnectionTicker"),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ QPID_LOG(debug, "ConnectionTickerTask fired");
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+
+ // Send Ticker
+ connection.activateOutput();
+ }
+};
}
void ConnectionContext::trace(const char* message) const
@@ -118,23 +140,15 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::
// Concatenate all known URLs into a single URL, get rid of duplicate addresses.
sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
qpid::Address::TCP : protocol);
- if (pn_transport_bind(engine, connection)) {
- //error
- }
if (identifier.empty()) {
identifier = qpid::types::Uuid(true).str();
}
- pn_connection_set_container(connection, identifier.c_str());
- bool enableTrace(false);
- QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
- if (enableTrace) {
- pn_transport_trace(engine, PN_TRACE_FRM);
- set_tracer(engine, this);
- }
+ configureConnection();
}
ConnectionContext::~ConnectionContext()
{
+ if (ticker) ticker->cancel();
close();
sessions.clear();
pn_transport_free(engine);
@@ -218,6 +232,10 @@ void ConnectionContext::close()
lock.wait();
}
}
+ if (ticker) {
+ ticker->cancel();
+ ticker.reset();
+ }
}
bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -498,7 +516,7 @@ uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> rece
void ConnectionContext::activateOutput()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- wakeupDriver();
+ if (state == CONNECTED) wakeupDriver();
}
/**
* Expects lock to be held by caller
@@ -530,14 +548,11 @@ void ConnectionContext::reset()
engine = pn_transport();
connection = pn_connection();
- pn_connection_set_container(connection, identifier.c_str());
- bool enableTrace(false);
- QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
- if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+ configureConnection();
+
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->reset(connection);
}
- pn_transport_bind(engine, connection);
}
void ConnectionContext::check() {
@@ -758,16 +773,31 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
//TODO: Fix pn_engine_input() to take const buffer
ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
- //If engine returns EOS, have no way of knowing how many bytes
- //it processed, but can assume none need to be reprocessed so
- //consider them all read:
- if (n == PN_EOS) n = size;
+ // PN_EOS either means we received a Close (which also means we've
+ // consumed all the input), OR some Very Bad Thing happened and this
+ // connection is toast.
+ if (n == PN_EOS)
+ {
+ std::string error;
+ if (checkTransportError(error)) {
+ // "He's dead, Jim."
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ transport->abort();
+ return 0;
+ } else {
+ n = size; // assume all consumed
+ }
+ }
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
- pn_transport_tick(engine, 0);
+ pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
- throw MessagingException(QPID_MSG("Error on input: " << getError()));
+ std::string error;
+ checkTransportError(error);
+ QPID_LOG_CAT(error, network, id << " connection error: " << error);
+ transport->abort();
+ return 0;
} else {
return 0;
}
@@ -792,10 +822,20 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
if (notifyOnWrite) lock.notifyAll();
return n;
} else if (n == PN_ERR) {
- throw MessagingException(QPID_MSG("Error on output: " << getError()));
+ std::string error;
+ checkTransportError(error);
+ QPID_LOG_CAT(error, network, id << " connection error: " << error);
+ transport->abort();
+ return 0;
} else if (n == PN_EOS) {
haveOutput = false;
- return 0;//Is this right?
+ // Normal close, or error?
+ std::string error;
+ if (checkTransportError(error)) {
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ transport->abort();
+ }
+ return 0;
} else {
haveOutput = false;
return 0;
@@ -804,6 +844,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
bool ConnectionContext::canEncodePlain()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
return haveOutput && state == CONNECTED;
}
void ConnectionContext::closed()
@@ -1061,7 +1102,6 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
}
QPID_LOG(debug, id << " Opening...");
- setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
@@ -1071,6 +1111,25 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
throw qpid::messaging::ConnectionError("Failed to open connection");
}
+
+ // Connection open - check for idle timeout from the remote and start a
+ // periodic tick to monitor for idle connections
+ pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine);
+ pn_timestamp_t local = pn_transport_get_idle_timeout(engine);
+ uint64_t shortest = ((remote && local)
+ ? std::min(remote, local)
+ : (remote) ? remote : local);
+ if (shortest) {
+ // send an idle frame at least twice before timeout
+ shortest = (shortest + 1)/2;
+ qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC);
+ ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this));
+ driver->getTimer().add(ticker);
+ QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:"
+ << " local=" << pn_transport_get_idle_timeout(engine)
+ << " remote=" << pn_transport_get_remote_idle_timeout(engine));
+ }
+
QPID_LOG(debug, id << " Opened");
return restartSessions();
@@ -1151,4 +1210,44 @@ bool ConnectionContext::CodecAdapter::canEncode()
}
+// setup the transport and connection objects:
+void ConnectionContext::configureConnection()
+{
+ pn_connection_set_container(connection, identifier.c_str());
+ setProperties();
+ if (heartbeat) {
+ // fail an idle connection at 2 x heartbeat (in msecs)
+ pn_transport_set_idle_timeout(engine, heartbeat*2*1000);
+ }
+
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) {
+ pn_transport_trace(engine, PN_TRACE_FRM);
+ set_tracer(engine, this);
+ }
+
+ int err = pn_transport_bind(engine, connection);
+ if (err)
+ QPID_LOG(error, id << " Error binding connection and transport: " << err);
+}
+
+
+// check for failures of the transport:
+bool ConnectionContext::checkTransportError(std::string& text)
+{
+ std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+ pn_condition_t* tcondition = pn_transport_condition(engine);
+ if (pn_condition_is_set(tcondition))
+ info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
+#else
+ pn_error_t* terror = pn_transport_error(engine);
+ if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]";
+#endif
+
+ text = info.str();
+ return !text.empty();
+}
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 12f3c21e0a..80da9dff10 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -25,6 +25,7 @@
#include <map>
#include <memory>
#include <string>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/messaging/ConnectionOptions.h"
@@ -47,6 +48,7 @@ class ProtocolVersion;
namespace sys {
class SecurityLayer;
struct SecuritySettings;
+class TimerTask;
}
namespace messaging {
class Duration;
@@ -120,7 +122,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void initSecurityLayer(qpid::sys::SecurityLayer&);
void trace(const char*) const;
- private:
+ private:
typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
class CodecAdapter : public qpid::sys::Codec
{
@@ -155,6 +157,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
bool notifyOnWrite;
+ boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
void check();
bool checkDisconnected();
@@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::string getError();
bool useSasl();
void setProperties();
+ void configureConnection();
+ bool checkTransportError(std::string&);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
index 16307b3c22..ebe3fff1cb 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
@@ -22,13 +22,14 @@
#include "Transport.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace messaging {
namespace amqp {
-DriverImpl::DriverImpl() : poller(new qpid::sys::Poller)
+DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new qpid::sys::Timer)
{
start();
}
@@ -48,6 +49,7 @@ void DriverImpl::stop()
QPID_LOG(debug, "Driver stopped");
poller->shutdown();
thread.join();
+ timer->stop();
}
boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
index 354fa1ae35..36cb196343 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
@@ -29,6 +29,7 @@
namespace qpid {
namespace sys {
class Poller;
+class Timer;
}
namespace messaging {
namespace amqp {
@@ -47,11 +48,14 @@ class DriverImpl
void stop();
boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection);
+ sys::Timer& getTimer() { return *timer; }
static boost::shared_ptr<DriverImpl> getDefault();
private:
boost::shared_ptr<qpid::sys::Poller> poller;
qpid::sys::Thread thread;
+ std::auto_ptr<sys::Timer> timer;
+
static qpid::sys::Mutex defaultLock;
static boost::weak_ptr<DriverImpl> theDefault;
};
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.h b/qpid/cpp/src/qpid/messaging/amqp/Transport.h
index 159916f9ae..6ec99ab58f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/Transport.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.h
@@ -40,6 +40,7 @@ class Transport : public qpid::sys::OutputControl
virtual ~Transport() {}
virtual void connect(const std::string& host, const std::string& port) = 0;
virtual void close() = 0;
+ virtual void abort() = 0;
virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp
index ad47fd98d9..5dbc13175f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/windows/SslTransport.cpp
@@ -94,6 +94,9 @@ void SslTransport::negotiationDone(SECURITY_STATUS status)
SslTransport::SslTransport(TransportContext& c, boost::shared_ptr<Poller> p) : TcpTransport(c, p)
{
const ConnectionOptions* options = context.getOptions();
+ if (options->sslIgnoreHostnameVerificationFailure) {
+ sslCredential.ignoreHostnameVerificationFailure();
+ }
const std::string& name = (options->sslCertName != "") ?
options->sslCertName : qpid::sys::ssl::SslOptions::global.certName;
certLoaded = sslCredential.load(name);
diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.h b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
index 1fcde6683d..5290fc16db 100644
--- a/qpid/cpp/src/qpid/store/MessageStorePlugin.h
+++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.h
@@ -267,7 +267,7 @@ class MessageStorePlugin :
};
StoreOptions options;
- typedef std::map<const std::string, StorageProvider*> ProviderMap;
+ typedef std::map<std::string, StorageProvider*> ProviderMap;
ProviderMap providers;
ProviderMap::const_iterator provider;
diff --git a/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp b/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp
index 4e4a1289d1..076056b2af 100644
--- a/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp
+++ b/qpid/cpp/src/qpid/sys/FreeBSD/uuid.cpp
@@ -40,5 +40,5 @@ void uuid_generate (uint8_t out[qpid::sys::UuidSize])
out[7] = (uuid.time_hi_and_version & 0x00ff);
out[8] = uuid.clock_seq_hi_and_reserved;
out[9] = uuid.clock_seq_low;
- ::memcpy(&out[10], &uuid.node, _UUID_NODE_LEN);
+ ::memcpy(&out[10], &uuid.node, sizeof(uuid.node));
}
diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h
index f556612908..cb0a1adce6 100644
--- a/qpid/cpp/src/qpid/sys/Thread.h
+++ b/qpid/cpp/src/qpid/sys/Thread.h
@@ -34,6 +34,8 @@
# define QPID_TSS __thread
#elif defined (__SUNPRO_CC)
# define QPID_TSS __thread
+#elif defined (__IBMCPP__)
+# define QPID_TSS __thread
#else
# error "Dont know how to define QPID_TSS for this platform"
#endif
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index eebf02e867..7d04d2214d 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -93,7 +93,7 @@ private:
AsynchAcceptor::AsynchAcceptor(const Socket& s,
AsynchAcceptor::Callback callback) :
acceptedCallback(callback),
- handle(s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
+ handle((const IOHandle&)s, boost::bind(&AsynchAcceptor::readable, this, _1), 0, 0),
socket(s) {
s.setNonblocking();
@@ -167,7 +167,7 @@ AsynchConnector::AsynchConnector(const Socket& s,
const std::string& port,
ConnectedCallback connCb,
FailedCallback failCb) :
- DispatchHandle(s,
+ DispatchHandle((const IOHandle&)s,
0,
boost::bind(&AsynchConnector::connComplete, this, _1),
boost::bind(&AsynchConnector::connComplete, this, _1)),
@@ -308,7 +308,7 @@ AsynchIO::AsynchIO(const Socket& s,
ReadCallback rCb, EofCallback eofCb, DisconnectCallback disCb,
ClosedCallback cCb, BuffersEmptyCallback eCb, IdleCallback iCb) :
- DispatchHandle(s,
+ DispatchHandle((const IOHandle&)s,
boost::bind(&AsynchIO::readable, this, _1),
boost::bind(&AsynchIO::writeable, this, _1),
boost::bind(&AsynchIO::disconnected, this, _1)),
diff --git a/qpid/cpp/src/qpid/sys/posix/Thread.cpp b/qpid/cpp/src/qpid/sys/posix/Thread.cpp
index 2075e1dfbc..349e35d643 100644
--- a/qpid/cpp/src/qpid/sys/posix/Thread.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Thread.cpp
@@ -59,7 +59,7 @@ Thread::operator bool() {
}
bool Thread::operator==(const Thread& t) const {
- return ::pthread_equal(impl->thread, t.impl->thread) != 0;
+ return pthread_equal(impl->thread, t.impl->thread) != 0;
}
bool Thread::operator!=(const Thread& t) const {
diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp
index c44640b3e0..10a5d944b1 100644
--- a/qpid/cpp/src/qpid/sys/posix/Time.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp
@@ -24,6 +24,7 @@
#include "qpid/sys/Time.h"
#include <ostream>
#include <istream>
+#include <sstream>
#include <time.h>
#include <stdio.h>
#include <sys/time.h>
@@ -106,13 +107,14 @@ std::istream& operator>>(std::istream& i, Duration& d) {
if (i.eof() || std::isspace(i.peek())) // No suffix
d = int64_t(number*TIME_SEC);
else {
- std::string suffix;
- i >> suffix;
+ std::stringbuf suffix;
+ i >> &suffix;
if (i.fail()) return i;
- if (suffix.compare("s") == 0) d = int64_t(number*TIME_SEC);
- else if (suffix.compare("ms") == 0) d = int64_t(number*TIME_MSEC);
- else if (suffix.compare("us") == 0) d = int64_t(number*TIME_USEC);
- else if (suffix.compare("ns") == 0) d = int64_t(number*TIME_NSEC);
+ std::string suffix_str = suffix.str();
+ if (suffix_str.compare("s") == 0) d = int64_t(number*TIME_SEC);
+ else if (suffix_str.compare("ms") == 0) d = int64_t(number*TIME_MSEC);
+ else if (suffix_str.compare("us") == 0) d = int64_t(number*TIME_USEC);
+ else if (suffix_str.compare("ns") == 0) d = int64_t(number*TIME_NSEC);
else i.setstate(std::ios::failbit);
}
return i;
diff --git a/qpid/cpp/src/qpid/sys/unordered_map.h b/qpid/cpp/src/qpid/sys/unordered_map.h
index 22cedad299..1b27770804 100644
--- a/qpid/cpp/src/qpid/sys/unordered_map.h
+++ b/qpid/cpp/src/qpid/sys/unordered_map.h
@@ -23,7 +23,7 @@
#if defined(_MSC_VER) || defined(_LIBCPP_VERSION) || __cplusplus >= 201103L
# include <unordered_map>
-#elif defined(__SUNPRO_CC)
+#elif defined(__SUNPRO_CC) || defined(__IBMCPP__)
# include <boost/tr1/unordered_map.hpp>
#else
# include <tr1/unordered_map>
diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp
index 667f0f1ef0..de8f10b0e9 100644
--- a/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.cpp
@@ -34,7 +34,7 @@ namespace sys {
namespace windows {
-SslCredential::SslCredential() : certStore(0), cert(0)
+SslCredential::SslCredential() : certStore(0), cert(0), hostnameVerification(true)
{
SecInvalidateHandle(&credHandle);
memset(&cred, 0, sizeof(cred));
@@ -60,6 +60,8 @@ bool SslCredential::load(const std::string& certName)
cred.paCred = &cert;
cred.cCreds = 1;
}
+ if (!hostnameVerification)
+ cred.dwFlags |= SCH_CRED_NO_SERVERNAME_CHECK;
SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL,
UNISP_NAME,
@@ -89,6 +91,10 @@ std::string SslCredential::error()
return loadError.error;
}
+void SslCredential::ignoreHostnameVerificationFailure(){
+ hostnameVerification = false;
+}
+
void SslCredential::loadPrivCertStore()
{
// Get a handle to the system store or pkcs#12 file
diff --git a/qpid/cpp/src/qpid/sys/windows/SslCredential.h b/qpid/cpp/src/qpid/sys/windows/SslCredential.h
index ba16dcdab5..25d174a2fa 100644
--- a/qpid/cpp/src/qpid/sys/windows/SslCredential.h
+++ b/qpid/cpp/src/qpid/sys/windows/SslCredential.h
@@ -53,6 +53,8 @@ public:
QPID_COMMON_EXTERN bool load(const std::string& certName);
QPID_COMMON_EXTERN CredHandle handle();
QPID_COMMON_EXTERN std::string error();
+ /** Proceed with connect inspite of hostname verifcation failures*/
+ QPID_COMMON_EXTERN void ignoreHostnameVerificationFailure();
private:
struct SavedError {
@@ -70,6 +72,7 @@ private:
CredHandle credHandle;
TimeStamp credExpiry;
SavedError loadError;
+ bool hostnameVerification;
PCCERT_CONTEXT findCertificate(const std::string& name);
void loadPrivCertStore();
diff --git a/qpid/cpp/src/qpid/sys/windows/Thread.cpp b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
index b342c9da1d..8034680664 100755
--- a/qpid/cpp/src/qpid/sys/windows/Thread.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Thread.cpp
@@ -224,7 +224,7 @@ Thread::Thread(Runnable* runnable) : impl(ThreadPrivate::createThread(runnable))
Thread::Thread(Runnable& runnable) : impl(ThreadPrivate::createThread(&runnable)) {}
Thread::operator bool() {
- return impl;
+ return !!impl;
}
bool Thread::operator==(const Thread& t) const {
diff --git a/qpid/cpp/src/qpid/types/Variant.cpp b/qpid/cpp/src/qpid/types/Variant.cpp
index f9820a82dd..6b979a016b 100644
--- a/qpid/cpp/src/qpid/types/Variant.cpp
+++ b/qpid/cpp/src/qpid/types/Variant.cpp
@@ -188,13 +188,13 @@ bool caseInsensitiveMatch(const std::string& a, const std::string& b)
return a.size() == b.size() && std::equal(a.begin(), a.end(), b.begin(), &same_char);
}
-const std::string TRUE("True");
-const std::string FALSE("False");
+const std::string TRUE_STRING("True");
+const std::string FALSE_STRING("False");
bool toBool(const std::string& s)
{
- if (caseInsensitiveMatch(s, TRUE)) return true;
- if (caseInsensitiveMatch(s, FALSE)) return false;
+ if (caseInsensitiveMatch(s, TRUE_STRING)) return true;
+ if (caseInsensitiveMatch(s, FALSE_STRING)) return false;
try { return boost::lexical_cast<int>(s); } catch(const boost::bad_lexical_cast&) {}
throw InvalidConversion(QPID_MSG("Cannot convert " << s << " to bool"));
}
@@ -494,7 +494,7 @@ std::string VariantImpl::asString() const
{
switch(type) {
case VAR_VOID: return EMPTY;
- case VAR_BOOL: return value.b ? TRUE : FALSE;
+ case VAR_BOOL: return value.b ? TRUE_STRING : FALSE_STRING;
case VAR_UINT8: return boost::lexical_cast<std::string>((int) value.ui8);
case VAR_UINT16: return boost::lexical_cast<std::string>(value.ui16);
case VAR_UINT32: return boost::lexical_cast<std::string>(value.ui32);
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 08a8c69d69..c914c50e33 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -364,6 +364,7 @@ add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
if (BUILD_AMQP)
add_test (interlink_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
+ add_test (idle_timeout_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/idle_timeout_tests.py)
endif (BUILD_AMQP)
add_test (swig_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix})
add_test (ipv6_test ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
diff --git a/qpid/cpp/src/tests/TimerTest.cpp b/qpid/cpp/src/tests/TimerTest.cpp
index e9ca3fcdf6..d28eeeffc1 100644
--- a/qpid/cpp/src/tests/TimerTest.cpp
+++ b/qpid/cpp/src/tests/TimerTest.cpp
@@ -82,7 +82,7 @@ class TestTask : public TimerTask
uint64_t difference = _abs64(expected - actual);
#elif defined(_WIN32)
uint64_t difference = labs(expected - actual);
-#elif defined(__SUNPRO_CC)
+#elif defined(__SUNPRO_CC) || defined (__IBMCPP__)
uint64_t difference = llabs(expected - actual);
#else
uint64_t difference = abs(expected - actual);
diff --git a/qpid/cpp/src/tests/idle_timeout_tests.py b/qpid/cpp/src/tests/idle_timeout_tests.py
new file mode 100755
index 0000000000..c3cc00746b
--- /dev/null
+++ b/qpid/cpp/src/tests/idle_timeout_tests.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import shutil
+import signal
+import sys
+
+from brokertest import *
+
+class AmqpIdleTimeoutTest(BrokerTest):
+ """
+ Test AMQP 1.0 idle-timeout support
+ """
+ def setUp(self):
+ BrokerTest.setUp(self)
+ if not BrokerTest.amqp_lib:
+ raise Skipped("AMQP 1.0 library not found")
+ if qm != qpid_messaging:
+ raise Skipped("AMQP 1.0 client not found")
+ self._broker = self.broker()
+
+ def test_client_timeout(self):
+ """Ensure that the client disconnects should the broker stop
+ responding.
+ """
+ conn = self._broker.connect(native=False, timeout=None,
+ protocol="amqp1.0", heartbeat=1)
+ self.assertTrue(conn.isOpen())
+ # should disconnect within 2 seconds of broker stop
+ deadline = time.time() + 8
+ os.kill(self._broker.pid, signal.SIGSTOP)
+ while time.time() < deadline:
+ if not conn.isOpen():
+ break;
+ self.assertTrue(not conn.isOpen())
+ os.kill(self._broker.pid, signal.SIGCONT)
+
+
+ def test_broker_timeout(self):
+ """By default, the broker will adopt the same timeout as the client
+ (mimics the 0-10 timeout behavior). Verify the broker disconnects
+ unresponsive clients.
+ """
+
+ count = len(self._broker.agent.getAllConnections())
+
+ # Create a new connection to the broker:
+ receiver_cmd = ["qpid-receive",
+ "--broker", self._broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, heartbeat:1}",
+ "--forever"]
+ receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE,
+ expect=EXPECT_UNKNOWN)
+ start = time.time()
+ deadline = time.time() + 10
+ while time.time() < deadline:
+ if count < len(self._broker.agent.getAllConnections()):
+ break;
+ self.assertTrue(count < len(self._broker.agent.getAllConnections()))
+
+ # now 'hang' the client, the broker should disconnect
+ start = time.time()
+ receiver.send_signal(signal.SIGSTOP)
+ deadline = time.time() + 10
+ while time.time() < deadline:
+ if count == len(self._broker.agent.getAllConnections()):
+ break;
+ self.assertEqual(count, len(self._broker.agent.getAllConnections()))
+ receiver.send_signal(signal.SIGCONT)
+ receiver.teardown()
+
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:])