diff options
author | Alex Rudyy <orudyy@apache.org> | 2012-12-17 11:22:49 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2012-12-17 11:22:49 +0000 |
commit | b3c7409db4cded6d116d851fab5f3863afaa00c8 (patch) | |
tree | 760b01cbf475ceecfbbb28d26b29e214e0c200ab | |
parent | 7fb88b40034ad1852d35fb2ed707dd22c429dd51 (diff) | |
download | qpid-python-b3c7409db4cded6d116d851fab5f3863afaa00c8.tar.gz |
merge from trunk up to revision 1415148: missed changes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1422853 13f79535-47bb-0310-9956-ffa450edef68
143 files changed, 2030 insertions, 475 deletions
diff --git a/qpid/QPID_VERSION.txt b/qpid/QPID_VERSION.txt index caa4836d8e..5320adc1c9 100644 --- a/qpid/QPID_VERSION.txt +++ b/qpid/QPID_VERSION.txt @@ -1 +1 @@ -0.19 +0.21 diff --git a/qpid/bin/release.sh b/qpid/bin/release.sh index b0e7ffa0d3..9189cd3cf3 100755 --- a/qpid/bin/release.sh +++ b/qpid/bin/release.sh @@ -37,6 +37,7 @@ usage() echo "--source|-e : Generate the source artefact" echo "--cpp |-c : Generate the CPP artefacts" echo "--java |-j : Generate the java artefacts" + echo "--perl |-r : Generate the Perl artefacts" echo "--python|-p : Generate the python artefacts" echo "--wcf |-w : Generate the WCF artefacts" echo "--tools |-t : Generate the tools artefacts" @@ -53,6 +54,7 @@ all_artefacts() CPP="CPP" JAVA="JAVA" + PERL="PERL" PYTHON="PYTHON" WCF="WCF" TOOLS="TOOLS" @@ -94,6 +96,9 @@ for arg in $* ; do --java|-j) JAVA="JAVA" ;; + --perl|-r) + PERL="PERL" + ;; --python|-p) PYTHON="PYTHON" ;; @@ -146,7 +151,7 @@ echo REV:$REV echo VER:$VER # If nothing is specified then do it all -if [ -z "${CLEAN}${PREPARE}${CPP}${JAVA}${PYTHON}${QMF}${TOOLS}${WCF}${SOURCE}${SIGN}${UPLOAD}" ] ; then +if [ -z "${CLEAN}${PREPARE}${CPP}${JAVA}${PERL}${PYTHON}${QMF}${TOOLS}${WCF}${SOURCE}${SIGN}${UPLOAD}" ] ; then PREPARE="PREPARE" all_artefacts SIGN="SIGN" @@ -184,6 +189,24 @@ if [ "SOURCE" == "$SOURCE" ] ; then tar -czf artifacts/qpid-${VER}.tar.gz qpid-${VER} fi +if [ "PERL" == "$PERL" ]; then + pushd qpid-${VER}/cpp/bindings/qpid + make + popd + mkdir qpid-${VER}/perl-qpid-${VER} + cp qpid-${VER}/cpp/bindings/qpid/perl/perl.i \ + qpid-${VER}/cpp/bindings/qpid/perl/*pm \ + qpid-${VER}/cpp/bindings/qpid/perl/LICENSE \ + qpid-${VER}/cpp/bindings/qpid/perl/Makefile.PL \ + qpid-${VER}/perl-qpid-${VER} + mkdir qpid-${VER}/perl-qpid-${VER}/examples + cp qpid-${VER}/cpp/bindings/qpid/examples/perl/* \ + qpid-${VER}/perl-qpid-${VER}/examples + pushd qpid-${VER} + tar -czf ../artifacts/perl-qpid-${VER}.tar.gz perl-qpid-${VER} + popd +fi + if [ "PYTHON" == "$PYTHON" ] ; then tar -czf artifacts/qpid-python-${VER}.tar.gz qpid-${VER}/python qpid-${VER}/specs fi diff --git a/qpid/cpp/BuildInstallSettings.cmake b/qpid/cpp/BuildInstallSettings.cmake index 23cc24c96c..dfa88022d9 100644 --- a/qpid/cpp/BuildInstallSettings.cmake +++ b/qpid/cpp/BuildInstallSettings.cmake @@ -180,6 +180,7 @@ endif() set (QPID_INSTALL_LIBDIR ${LIB_INSTALL_DIR}) set (QPID_LOCALSTATE_DIR var) # Directory to store local state data set (QPID_MAN_DIR man) # Directory to install manual files + set (QPID_INSTALL_SYSTEMDDIR usr/lib/systemd/system) # Systemd service files set_absolute_install_path (QPIDC_MODULE_DIR ${QPID_INSTALL_LIBDIR}/qpid/client) # Directory to load client plug-in modules from set_absolute_install_path (QPIDD_MODULE_DIR ${QPID_INSTALL_LIBDIR}/qpid/daemon) # Directory to load broker plug-in modules from diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt index 0a3b1149ba..6f506a5b0e 100644 --- a/qpid/cpp/CMakeLists.txt +++ b/qpid/cpp/CMakeLists.txt @@ -68,6 +68,15 @@ set (QPIDD_CONF_FILE ${QPIDD_CONF_PATH} CACHE STRING "Name of the Qpid broker configuration file") install(FILES LICENSE NOTICE DESTINATION ${QPID_INSTALL_DOCDIR}) +install(FILES xml/cluster.xml + bindings/swig_perl_typemaps.i + bindings/swig_python_typemaps.i + bindings/swig_ruby_typemaps.i + DESTINATION ${QPID_INSTALL_DATADIR}) +install(FILES include/qpid/qpid.i + include/qmf/qmfengine.i + include/qmf/qmf2.i + DESTINATION ${QPID_INSTALL_INCLUDEDIR}) if (WIN32) set (CMAKE_DEBUG_POSTFIX "d") diff --git a/qpid/cpp/Makefile.am b/qpid/cpp/Makefile.am index 8a06948c3e..0bb2e7500a 100644 --- a/qpid/cpp/Makefile.am +++ b/qpid/cpp/Makefile.am @@ -26,7 +26,8 @@ EXTRA_DIST = \ LICENSE NOTICE README.txt SSL RELEASE_NOTES DESIGN \ xml/cluster.xml INSTALL-WINDOWS CMakeLists.txt BuildInstallSettings.cmake \ packaging/NSIS QPID_VERSION.txt bindings/CMakeLists.txt \ - bindings/swig_python_typemaps.i bindings/swig_ruby_typemaps.i bindings/swig_perl_typemaps.i + bindings/swig_python_typemaps.i bindings/swig_ruby_typemaps.i bindings/swig_perl_typemaps.i \ + include/qpid/qpid.i include/qmf/qmfengine.i include/qmf/qmf2.i SUBDIRS = managementgen etc src docs/api docs/man examples bindings/qmf bindings/qpid bindings/qmf2 @@ -36,4 +37,4 @@ libtool: $(LIBTOOL_DEPS) check-long: all $(MAKE) -C src/tests check-long -
\ No newline at end of file + diff --git a/qpid/cpp/bindings/qmf/python/CMakeLists.txt b/qpid/cpp/bindings/qmf/python/CMakeLists.txt index adeb1db78a..1768df7f85 100644 --- a/qpid/cpp/bindings/qmf/python/CMakeLists.txt +++ b/qpid/cpp/bindings/qmf/python/CMakeLists.txt @@ -21,7 +21,7 @@ ## Use Swig to generate a literal binding to the C++ API ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES CPLUSPLUS ON) -set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include;-I${qpid-cpp_SOURCE_DIR}/bindings") swig_add_module(qmfengine_python python ${CMAKE_CURRENT_SOURCE_DIR}/python.i) swig_link_libraries(qmfengine_python qmf qmfconsole ${PYTHON_LIBRARIES}) diff --git a/qpid/cpp/bindings/qmf/python/Makefile.am b/qpid/cpp/bindings/qmf/python/Makefile.am index bcef8c6b53..07f3c1072b 100644 --- a/qpid/cpp/bindings/qmf/python/Makefile.am +++ b/qpid/cpp/bindings/qmf/python/Makefile.am @@ -29,7 +29,7 @@ EXTRA_DIST = CMakeLists.txt python.i BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 -$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmfengine.i +$(generated_file_list): $(srcdir)/python.i $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o qmfengine.cpp $(srcdir)/python.i pylibdir = $(pyexecdir) diff --git a/qpid/cpp/bindings/qmf/python/python.i b/qpid/cpp/bindings/qmf/python/python.i index 5e25d155f9..118d0d3dbd 100644 --- a/qpid/cpp/bindings/qmf/python/python.i +++ b/qpid/cpp/bindings/qmf/python/python.i @@ -139,5 +139,5 @@ -%include "../qmfengine.i" +%include "qmf/qmfengine.i" diff --git a/qpid/cpp/bindings/qmf/ruby/CMakeLists.txt b/qpid/cpp/bindings/qmf/ruby/CMakeLists.txt index ce853b767b..1fb2542e46 100644 --- a/qpid/cpp/bindings/qmf/ruby/CMakeLists.txt +++ b/qpid/cpp/bindings/qmf/ruby/CMakeLists.txt @@ -22,7 +22,9 @@ ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/ruby.i PROPERTIES CPLUSPLUS ON) -include_directories(${RUBY_INCLUDE_DIRS} ${qpid-cpp_SOURCE_DIR}/include) +include_directories(${RUBY_INCLUDE_DIRS} + ${qpid-cpp_SOURCE_DIR}/include + ${qpid-cpp_SOURCE_DIR}/bindings) swig_add_module(qmfengine_ruby ruby ${CMAKE_CURRENT_SOURCE_DIR}/ruby.i) swig_link_libraries(qmfengine_ruby qmf qmfconsole ${RUBY_LIBRARY}) diff --git a/qpid/cpp/bindings/qmf/ruby/Makefile.am b/qpid/cpp/bindings/qmf/ruby/Makefile.am index 1c7f67edb3..33393aeda0 100644 --- a/qpid/cpp/bindings/qmf/ruby/Makefile.am +++ b/qpid/cpp/bindings/qmf/ruby/Makefile.am @@ -29,7 +29,7 @@ rubylibdir = $(RUBY_LIB) dist_rubylib_DATA = qmf.rb -qmfengine.cpp: $(srcdir)/ruby.i $(srcdir)/../qmfengine.i +qmfengine.cpp: $(srcdir)/ruby.i $(SWIG) -ruby -c++ $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o qmfengine.cpp $(srcdir)/ruby.i rubylibarchdir = $(RUBY_LIB_ARCH) diff --git a/qpid/cpp/bindings/qmf/ruby/ruby.i b/qpid/cpp/bindings/qmf/ruby/ruby.i index 0101861100..2854aa0c7e 100644 --- a/qpid/cpp/bindings/qmf/ruby/ruby.i +++ b/qpid/cpp/bindings/qmf/ruby/ruby.i @@ -102,5 +102,5 @@ } -%include "../qmfengine.i" +%include "qmf/qmfengine.i" diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am index 062fbd0a85..b0321d4e5d 100644 --- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am +++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am @@ -24,13 +24,13 @@ AM_CPPFLAGS = $(INCLUDE) noinst_PROGRAMS=agent event_driven_list_agents list_agents print_events agent_SOURCES=agent.cpp -agent_LDADD=$(top_builddir)/src/libqmf2.la +agent_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging list_agents_SOURCES=list_agents.cpp -list_agents_LDADD=$(top_builddir)/src/libqmf2.la +list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging event_driven_list_agents_SOURCES=event_driven_list_agents.cpp -event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la +event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging print_events_SOURCES=print_events.cpp -print_events_LDADD=$(top_builddir)/src/libqmf2.la +print_events_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging diff --git a/qpid/cpp/bindings/qmf2/python/CMakeLists.txt b/qpid/cpp/bindings/qmf2/python/CMakeLists.txt index 2a823ff1ee..1c8447116e 100644 --- a/qpid/cpp/bindings/qmf2/python/CMakeLists.txt +++ b/qpid/cpp/bindings/qmf2/python/CMakeLists.txt @@ -21,7 +21,7 @@ ## Use Swig to generate a literal binding to the C++ API ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES CPLUSPLUS ON) -set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include;-I${qpid-cpp_SOURCE_DIR}/bindings") swig_add_module(cqmf2_python python ${CMAKE_CURRENT_SOURCE_DIR}/python.i) swig_link_libraries(cqmf2_python qmf2 ${PYTHON_LIBRARIES}) diff --git a/qpid/cpp/bindings/qmf2/python/Makefile.am b/qpid/cpp/bindings/qmf2/python/Makefile.am index 591c1408c0..309e8f8dad 100644 --- a/qpid/cpp/bindings/qmf2/python/Makefile.am +++ b/qpid/cpp/bindings/qmf2/python/Makefile.am @@ -19,7 +19,7 @@ if HAVE_PYTHON_DEVEL -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src $(QMF_INCLUDES) +INCLUDES = -I$(top_srcdir)/include -I$(top_srcdir)/bindings -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src $(QMF_INCLUDES) generated_file_list = \ cqmf2.cpp \ @@ -29,7 +29,7 @@ EXTRA_DIST = CMakeLists.txt python.i BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 -$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qmf2.i $(srcdir)/../../swig_python_typemaps.i +$(generated_file_list): $(srcdir)/python.i $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/python.i pylibdir = $(pyexecdir) diff --git a/qpid/cpp/bindings/qmf2/python/python.i b/qpid/cpp/bindings/qmf2/python/python.i index 02dd1632b0..39193a2475 100644 --- a/qpid/cpp/bindings/qmf2/python/python.i +++ b/qpid/cpp/bindings/qmf2/python/python.i @@ -19,7 +19,7 @@ %module cqmf2 %include "std_string.i" -%include "../../swig_python_typemaps.i" +%include "swig_python_typemaps.i" /* Define the general-purpose exception handling */ %exception { @@ -37,5 +37,5 @@ } } -%include "../qmf2.i" +%include "qmf/qmf2.i" diff --git a/qpid/cpp/bindings/qmf2/ruby/CMakeLists.txt b/qpid/cpp/bindings/qmf2/ruby/CMakeLists.txt index f3d2cd0d9e..70b3e917f9 100644 --- a/qpid/cpp/bindings/qmf2/ruby/CMakeLists.txt +++ b/qpid/cpp/bindings/qmf2/ruby/CMakeLists.txt @@ -22,7 +22,9 @@ ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/ruby.i PROPERTIES CPLUSPLUS ON) -include_directories(${RUBY_INCLUDE_DIRS} ${qpid-cpp_SOURCE_DIR}/include) +include_directories(${RUBY_INCLUDE_DIRS} + ${qpid-cpp_SOURCE_DIR}/include + ${qpid-cpp_SOURCE_DIR}/bindings) swig_add_module(cqmf2_ruby ruby ${CMAKE_CURRENT_SOURCE_DIR}/ruby.i) swig_link_libraries(cqmf2_ruby qmf2 ${RUBY_LIBRARY}) diff --git a/qpid/cpp/bindings/qmf2/ruby/Makefile.am b/qpid/cpp/bindings/qmf2/ruby/Makefile.am index a03bd6d5e6..9952edb972 100644 --- a/qpid/cpp/bindings/qmf2/ruby/Makefile.am +++ b/qpid/cpp/bindings/qmf2/ruby/Makefile.am @@ -19,7 +19,7 @@ if HAVE_RUBY_DEVEL -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src $(QMF_INCLUDES) +INCLUDES = -I$(top_srcdir)/include -I$(top_srcdir)/bindings -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src $(QMF_INCLUDES) EXTRA_DIST = CMakeLists.txt ruby.i BUILT_SOURCES = cqmf2.cpp @@ -27,7 +27,7 @@ SWIG_FLAGS = -w362,401 rubylibdir = $(RUBY_LIB) -cqmf2.cpp: $(srcdir)/ruby.i $(srcdir)/../qmf2.i $(srcdir)/../../swig_ruby_typemaps.i +cqmf2.cpp: $(srcdir)/ruby.i $(SWIG) -ruby -c++ $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqmf2.cpp $(srcdir)/ruby.i rubylibarchdir = $(RUBY_LIB_ARCH) diff --git a/qpid/cpp/bindings/qmf2/ruby/ruby.i b/qpid/cpp/bindings/qmf2/ruby/ruby.i index 82c963c7d6..ea14904be8 100644 --- a/qpid/cpp/bindings/qmf2/ruby/ruby.i +++ b/qpid/cpp/bindings/qmf2/ruby/ruby.i @@ -21,7 +21,7 @@ /* Ruby doesn't have a != operator*/ #pragma SWIG nowarn=378 %include "std_string.i" -%include "../../swig_ruby_typemaps.i" +%include "swig_ruby_typemaps.i" /* Define the general-purpose exception handling */ %exception { @@ -34,4 +34,4 @@ } } -%include "../qmf2.i" +%include "qmf/qmf2.i" diff --git a/qpid/cpp/bindings/qpid/Makefile.am b/qpid/cpp/bindings/qpid/Makefile.am index 96e2a58414..77eba6a524 100644 --- a/qpid/cpp/bindings/qpid/Makefile.am +++ b/qpid/cpp/bindings/qpid/Makefile.am @@ -21,7 +21,7 @@ SUBDIRS = dotnet if HAVE_SWIG -EXTRA_DIST = qpid.i +EXTRA_DIST = CMakeLists.txt qpid.i if HAVE_RUBY_DEVEL SUBDIRS += ruby @@ -33,18 +33,18 @@ endif if HAVE_PERL_DEVEL -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src -I$(PERL_INC) +INCLUDES = -I$(top_srcdir)/include -I$(top_srcdir)/bindings -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src -I$(PERL_INC) EXTRA_DIST += perl/perl.i perl/CMakeLists.txt BUILT_SOURCES = perl/cqpid_perl.cpp SWIG_FLAGS = -w362,401 -perl/cqpid_perl.cpp: $(srcdir)/perl/perl.i $(srcdir)/qpid.i $(srcdir)/../swig_perl_typemaps.i +perl/cqpid_perl.cpp: $(srcdir)/perl/perl.i $(SWIG) -perl -c++ $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o perl/cqpid_perl.cpp $(srcdir)/perl/perl.i perl/Makefile: perl/cqpid_perl.cpp cd perl; \ - $(PERL) Makefile.PL PREFIX=$(prefix) LIB=$(PERL_ARCHLIB) ; \ + $(PERL) Makefile.PL PREFIX=$(prefix) ; \ cd .. all-local: perl/Makefile @@ -54,7 +54,7 @@ all-local: perl/Makefile install-exec-local: cd perl ; \ - $(MAKE) pure_install DESTDIR=$(prefix) ; \ + $(MAKE) pure_install ; \ cd .. clean-local: diff --git a/qpid/cpp/bindings/qpid/examples/perl/README b/qpid/cpp/bindings/qpid/examples/perl/README deleted file mode 100644 index 1e113f1fa0..0000000000 --- a/qpid/cpp/bindings/qpid/examples/perl/README +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -The examples in this directory are written against the raw Perl -binding ("cqpid"). This binding is identical to the C++ messaging (in -namespace qpid::messaging). - -It is desired that a layer will be written over this interface (called -"qpid") that provides a more Perl-specific API. When this occurs, -these examples will be changed to use the new Perl API. - diff --git a/qpid/cpp/bindings/qpid/examples/perl/client.pl b/qpid/cpp/bindings/qpid/examples/perl/client.pl index 19d9d3f14f..5f9d1ff130 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/client.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/client.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,23 +20,23 @@ use strict; use warnings; -use cqpid_perl; +use qpid; my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); -my $session = $connection->createSession(); +my $session = $connection->create_session(); -my $sender = $session->createSender("service_queue"); +my $sender = $session->create_sender("service_queue"); #create temp queue & receiver... -my $responseQueue = new cqpid_perl::Address("#response-queue; {create:always, delete:always}"); -my $receiver = $session->createReceiver($responseQueue); +my $responseQueue = new qpid::messaging::Address("#response-queue; {create:always, delete:always}"); +my $receiver = $session->create_receiver($responseQueue); #Now send some messages... @@ -47,13 +47,13 @@ my @s = ( "And the mome raths outgrabe." ); -my $request = new cqpid_perl::Message(); -$request->setReplyTo($responseQueue); +my $request = new qpid::messaging::Message(); +$request->set_reply_to($responseQueue); for (my $i=0; $i<4; $i++) { - $request->setContent($s[$i]); + $request->set_content($s[$i]); $sender->send($request); my $response = $receiver->fetch(); - print $request->getContent() . " -> " . $response->getContent() . "\n"; + print $request->get_content() . " -> " . $response->get_content() . "\n"; } $connection->close(); diff --git a/qpid/cpp/bindings/qpid/examples/perl/drain.pl b/qpid/cpp/bindings/qpid/examples/perl/drain.pl index 60ac0c50ed..2da28f2867 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/drain.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/drain.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,15 +20,17 @@ use strict; use warnings; -use cqpid_perl; +use qpid; use Getopt::Long; +use Pod::Usage; my $url = "127.0.0.1"; -my $timeout = 60; +my $timeout = 0; my $forever = 0; -my $count = 1; +my $count = 0; my $connectionOptions = ""; my $address = "amq.direct"; +my $help; my $result = GetOptions( "broker|b=s" => \ $url, @@ -36,48 +38,59 @@ my $result = GetOptions( "forever|f" => \ $forever, "connection-options=s" => \ $connectionOptions, "count|c=i" => \ $count, -); + "help|h" => \ $help + ) || pod2usage(-verbose => 0); -if (! $result) { - print "Usage: perl drain.pl [OPTIONS]\n"; -} +pod2usage(-verbose => 1) if $help; if ($#ARGV ge 0) { $address = $ARGV[0] } sub getTimeout { - return ($forever) ? $cqpid_perl::Duration::FOREVER : new cqpid_perl::Duration($timeout*1000); + return ($forever) ? qpid::messaging::Duration::FOREVER : new qpid::messaging::Duration($timeout*1000); } +sub printProperties { + my $h = shift(); + return qq[{${\(join', ',map"'$_': '$h->{$_}'",keys%$h)}}] +} -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); my $timeout = getTimeout(); - - my $message = new cqpid_perl::Message(); + my $message = new qpid::messaging::Message(); my $i = 0; - while($receiver->fetch($message, $timeout)) { - print "Message(properties=" . $message->getProperties() . ",content='"; - if ($message->getContentType() eq "amqp/map") { - my $content = cqpid_perl::decodeMap($message); + for (;;) { + eval { + $message = $receiver->fetch($timeout); + }; + + if ($@) { + last; + } + + my $redelivered = ($message->get_redelivered) ? "redelivered=True, " : ""; + print "Message(" . $redelivered . "properties=" . printProperties($message->get_properties()) . ", content='"; + if ($message->get_content_type() eq "amqp/map") { + my $content = qpid::messasging::decode_map($message); map{ print "\n$_ => $content->{$_}"; } keys %{$content}; } else { - print $message->getContent(); + print $message->get_content(); } print "')\n"; - - my $replyto = $message->getReplyTo(); - if ($replyto->getName()) { - print "Replying to " . $message->getReplyTo()->str() . "...\n"; - my $sender = $session->createSender($replyto); - my $response = new cqpid_perl::Message("received by the server."); + + my $replyto = $message->get_reply_to(); + if ($replyto->get_name()) { + print "Replying to " . $message->get_reply_to()->str() . "...\n"; + my $sender = $session->create_sender($replyto); + my $response = new qpid::messaging::Message("received by the server."); $sender->send($response); } $session->acknowledge(); @@ -86,6 +99,7 @@ eval { last; } } + $receiver->close(); $session->close(); $connection->close(); @@ -96,3 +110,21 @@ if ($@) { die $@; } +__END__ + +=head1 NAME + +drain - Drains messages from the specified address + +=head1 SYNOPSIS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE timeout in seconds to wait before exiting + -f, --forever ignore timeout and wait forever + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + -c VALUE, --count VALUE number of messages to read before exiting + +=cut + diff --git a/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl b/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl index a96b98a002..faf9cd3638 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/hello_world.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,33 +21,27 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; my $address = ( @ARGV > 1 ) ? $ARGV[0] : "amq.topic"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($broker, $connectionOptions); +my $connection = new qpid::messaging::Connection($broker, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); - my $sender = $session->createSender($address); + my $session = $connection->create_session(); - $sender->send(new cqpid_perl::Message("Hello world!")); + my $receiver = $session->create_receiver($address); + my $sender = $session->create_sender($address); - #my $duration = new cqpid_perl::Duration(1000); - #print ">>>" . $duration->getMilliseconds() . "\n"; + $sender->send(new qpid::messaging::Message("Hello world!")); - my $message = $receiver->fetch($cqpid_perl::Duration::SECOND); + my $message = $receiver->fetch(qpid::messaging::Duration::SECOND); - #$message->setDurable(1); - #print "Durable: " . $message->getDurable() . "\n"; - #print Dumper($message->getProperties()); - - print $message->getContent() . "\n"; + print $message->get_content() . "\n"; $session->acknowledge(); $connection->close(); diff --git a/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl b/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl index cebf2ceee6..51ccb8bd23 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/hello_xml.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,7 +20,7 @@ use strict; use warnings; -use cqpid_perl; +use qpid; my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; @@ -36,7 +36,7 @@ END my $address = <<END; xml-exchange; { -create: always, +create: always, node: { type: topic, x-declare: { type: xml } }, link: { x-bindings: [{ exchange: xml-exchange, key: weather, arguments: { xquery:" $query" } }] @@ -44,15 +44,15 @@ x-bindings: [{ exchange: xml-exchange, key: weather, arguments: { xquery:" $quer END -my $connection = new cqpid_perl::Connection($broker, $connectionOptions); +my $connection = new qpid::messaging::Connection($broker, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); + my $session = $connection->create_session(); - my $receiver = $session->createReceiver($address); - - my $message = new cqpid_perl::Message(); + my $receiver = $session->create_receiver($address); + + my $message = new qpid::messaging::Message(); my $content = <<END; <weather> @@ -62,13 +62,13 @@ eval { <dewpoint>35</dewpoint> </weather> END - - $message->setContent($content); - my $sender = $session->createSender('xml-exchange/weather'); + + $message->set_content($content); + my $sender = $session->create_sender('xml-exchange/weather'); $sender->send($message); - + my $response = $receiver->fetch(); - print $response->getContent() . "\n"; + print $response->get_content() . "\n"; $connection->close(); }; diff --git a/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl b/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl index 2e2611e38f..048209fe75 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/map_receiver.pl @@ -1,4 +1,4 @@ -#! /usr/bin/perl5 +#! /usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,22 +21,21 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; my $address = ( @ARGV > 1 ) ? $ARGV[0] : "message_queue; {create: always}"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); - my $receiver = $session->createReceiver($address); + my $session = $connection->create_session(); + my $receiver = $session->create_receiver($address); + + my $content = qpid::messaging::decode_map($receiver->fetch()); - my $content = cqpid_perl::decodeMap($receiver->fetch()); - #my $content = cqpid_perl::decodeList($receiver->fetch()); - print Dumper($content); $session->acknowledge(); diff --git a/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl b/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl index 4107cd48b9..4aae7b6b0e 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/map_sender.pl @@ -1,4 +1,4 @@ -#! /usr/bin/perl5 +#! /usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,27 +21,27 @@ use strict; use warnings; use Data::Dumper; -use cqpid_perl; +use qpid; my $url = ( @ARGV > 0 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; my $address = ( @ARGV > 1 ) ? $ARGV[1] : "message_queue; {create: always}"; my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[2] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); - my $sender = $session->createSender($address); + my $session = $connection->create_session(); + my $sender = $session->create_sender($address); - my $message = new cqpid_perl::Message(); - my $content = { id => 987654321, - name => "Widget", - percent => sprintf("%.2f", 0.99), - colours => [ qw (red green white) ], + my $message = new qpid::messaging::Message(); + my $content = { id => 987654321, + name => "Widget", + percent => sprintf("%.2f", 0.99), + colours => [ qw (red green white) ], }; - cqpid_perl::encode($content, $message); + qpid::messaging::encode($content, $message); $sender->send($message, 1); $connection->close(); diff --git a/qpid/cpp/bindings/qpid/examples/perl/server.pl b/qpid/cpp/bindings/qpid/examples/perl/server.pl index b14da565b9..7dbe7bc51c 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/server.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/server.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,34 +20,35 @@ use strict; use warnings; -use cqpid_perl; +use qpid; my $url = ( @ARGV == 1 ) ? $ARGV[0] : "amqp:tcp:127.0.0.1:5672"; -my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; +my $connectionOptions = ( @ARGV > 1 ) ? $ARGV[1] : ""; -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); + my $session = $connection->create_session(); - my $receiver = $session->createReceiver("service_queue; {create: always}"); + my $receiver = $session->create_receiver("service_queue; {create: always}"); while (1) { my $request = $receiver->fetch(); - my $address = $request->getReplyTo(); + my $address = $request->get_reply_to(); + if ($address) { - my $sender = $session->createSender($address); - my $s = $request->getContent(); + my $sender = $session->create_sender($address); + my $s = $request->get_content(); $s = uc($s); - my $response = new cqpid_perl::Message($s); + my $response = new qpid::messaging::Message($s); $sender->send($response); - print "Processed request: " . $request->getContent() . " -> " . $response->getContent() . "\n"; + print "Processed request: " . $request->get_content() . " -> " . $response->get_content() . "\n"; $session->acknowledge(); } else { - print "Error: no reply address specified for request: " . $request->getContent() . "\n"; + print "Error: no reply address specified for request: " . $request->get_content() . "\n"; $session->reject($request); } } diff --git a/qpid/cpp/bindings/qpid/examples/perl/spout.pl b/qpid/cpp/bindings/qpid/examples/perl/spout.pl index 7365e732bf..af87b0a348 100644..100755 --- a/qpid/cpp/bindings/qpid/examples/perl/spout.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/spout.pl @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -20,8 +20,9 @@ use strict; use warnings; -use cqpid_perl; +use qpid; use Getopt::Long; +use Pod::Usage; use Time::Local; my $url = "127.0.0.1"; @@ -34,6 +35,7 @@ my @entries; my $content = ""; my $connectionOptions = ""; my $address = "amq.direct"; +my $help; my $result = GetOptions( "broker|b=s" => \ $url, @@ -44,20 +46,16 @@ my $result = GetOptions( "property|p=s@" => \ @properties, "map|m=s@" => \ @entries, "content=s" => \ $content, - "connection-options=s" => \ $connectionOptions, -); - - -if (! $result) { - print "Usage: perl drain.pl [OPTIONS]\n"; -} + "connection-options=s" => \ $connectionOptions, + "help|h" => \ $help + ) || pod2usage(-verbose => 0); +pod2usage(-verbose => 1) if $help; if ($#ARGV ge 0) { $address = $ARGV[0] } - sub setEntries { my ($content) = @_; @@ -73,23 +71,23 @@ sub setProperties { foreach (@properties) { my ($name, $value) = split("=", $_); - $message->getProperties()->{$name} = $value; + $message->setProperty($name, $value); } } -my $connection = new cqpid_perl::Connection($url, $connectionOptions); +my $connection = new qpid::messaging::Connection($url, $connectionOptions); eval { $connection->open(); - my $session = $connection->createSession(); - my $sender = $session->createSender($address); + my $session = $connection->create_session(); + my $sender = $session->create_sender($address); - my $message = new cqpid_perl::Message(); + my $message = new qpid::messaging::Message(); setProperties($message) if (@properties); if (@entries) { my $content = {}; setEntries($content); - cqpid_perl::encode($content, $message); + qpid::messaging::encode($content, $message); } elsif ($content) { $message->setContent($content); @@ -98,7 +96,7 @@ eval { my $receiver; if ($replyto) { - my $responseQueue = new cqpid_perl::Address($replyto); + my $responseQueue = new qpid::messaging::Address($replyto); $receiver = $session->createReceiver($responseQueue); $message->setReplyTo($responseQueue); } @@ -108,9 +106,9 @@ eval { my $s = "$s[3]$s[4]$s[5]"; my $n = $s; - for (my $i = 0; + for (my $i = 0; ($i < $count || $count == 0) and - ($timeout == 0 || abs($n - $s) < $timeout); + ($timeout == 0 || abs($n - $s) < $timeout); $i++) { $sender->send($message); @@ -134,3 +132,26 @@ if ($@) { } +__END__ + +=head1 NAME + +spout - Send messages to the specified address + +=head1 SYNOPSIS + + Usage: spout [OPTIONS] ADDRESS + + Options: + -h, --help show this message + -b VALUE, --broker VALUE url of broker to connect to + -t VALUE, --timeout VALUE exit after the specified time + -c VALUE, --count VALUE stop after count messageshave been sent, zero disables + -i VALUE, --id VALUE use the supplied id instead of generating one + --reply-to VALUE specify reply-to value + -P VALUE, --property VALUE specify message property + -M VALUE, --map VALUE specify entry for map content + --content VALUE specify textual content + --connection-options VALUE connection options string in the form {name1:value1, name2:value2} + +=cut diff --git a/qpid/cpp/bindings/qpid/perl/CMakeLists.txt b/qpid/cpp/bindings/qpid/perl/CMakeLists.txt index 3a47303dd7..a1380fa4d0 100644 --- a/qpid/cpp/bindings/qpid/perl/CMakeLists.txt +++ b/qpid/cpp/bindings/qpid/perl/CMakeLists.txt @@ -21,18 +21,25 @@ ## Use Swig to generate a literal binding to the C++ API ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/perl.i PROPERTIES CPLUSPLUS ON) -set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/perl.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/perl.i + PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include;-I${qpid-cpp_SOURCE_DIR}/include;-I${qpid-cpp_SOURCE_DIR}/bindings") swig_add_module(cqpid_perl perl ${CMAKE_CURRENT_SOURCE_DIR}/perl.i) swig_link_libraries(cqpid_perl qpidmessaging qpidtypes qmf2 ${PERL_LIBRARY}) -set_source_files_properties(${swig_generated_file_fullname} PROPERTIES COMPILE_FLAGS "-fno-strict-aliasing -I${PERL_INCLUDE_PATH} -I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${swig_generated_file_fullname} PROPERTIES COMPILE_FLAGS "-fno-strict-aliasing") +include_directories(${PERL_INCLUDE_PATH} + ${qpid-cpp_SOURCE_DIR}/include + ${qpid-cpp_SOURCE_DIR}/bindings) ##---------------------------------- ## Install the complete Perl binding ##---------------------------------- install(FILES ${CMAKE_CURRENT_BINARY_DIR}/libcqpid_perl.so ${CMAKE_CURRENT_BINARY_DIR}/cqpid_perl.pm + ${CMAKE_CURRENT_SOURCE_DIR}/qpid.pm + ${CMAKE_CURRENT_SOURCE_DIR}/LICENSE + ${CMAKE_CURRENT_SOURCE_DIR}/Makefile.PL DESTINATION ${PERL_PFX_ARCHLIB} COMPONENT ${QPID_COMPONENT_CLIENT} ) diff --git a/qpid/cpp/bindings/qpid/perl/LICENSE b/qpid/cpp/bindings/qpid/perl/LICENSE new file mode 100644 index 0000000000..bc46b77047 --- /dev/null +++ b/qpid/cpp/bindings/qpid/perl/LICENSE @@ -0,0 +1,206 @@ +========================================================================= +== Apache License == +========================================================================= + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/qpid/cpp/bindings/qpid/perl/Makefile.PL b/qpid/cpp/bindings/qpid/perl/Makefile.PL new file mode 100644 index 0000000000..7a4d7f03dc --- /dev/null +++ b/qpid/cpp/bindings/qpid/perl/Makefile.PL @@ -0,0 +1,13 @@ +#!/usr/bin/perl -w + +use strict; + +use ExtUtils::MakeMaker; +use Config; + +WriteMakefile( + NAME => 'cqpid_perl', + PREREQ_PM => {}, + LIBS => ["-lqpidmessaging -lqpidtypes"], + C => ['cqpid_perl.cpp'], +); diff --git a/qpid/cpp/bindings/qpid/perl/perl.i b/qpid/cpp/bindings/qpid/perl/perl.i index 38ac91761f..aec2ee1c9e 100644 --- a/qpid/cpp/bindings/qpid/perl/perl.i +++ b/qpid/cpp/bindings/qpid/perl/perl.i @@ -19,7 +19,7 @@ %module cqpid_perl %include "std_string.i" -%include "../../swig_perl_typemaps.i" +%include "swig_perl_typemaps.i" /* Define the general-purpose exception handling */ %exception { @@ -31,5 +31,5 @@ } } -%include "../qpid.i" +%include "qpid/qpid.i" diff --git a/qpid/cpp/bindings/qpid/perl/qpid.pm b/qpid/cpp/bindings/qpid/perl/qpid.pm new file mode 100644 index 0000000000..a0f8ef7aa2 --- /dev/null +++ b/qpid/cpp/bindings/qpid/perl/qpid.pm @@ -0,0 +1,840 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use strict; +use warnings; +use cqpid_perl; + +package qpid::messaging; + +sub encode { + my $content = $_[0]; + my $message = $_[1]; + + cqpid_perl::encode($content, $message->get_implementation()); +} + +sub decode_map { + my $message = $_[0]; + + return cqpid_perl::decodeMap($message->get_implementation()); +} + + + +package qpid::messaging::Address; + +use overload ( + 'bool' => \&boolify, + ); + +sub boolify { + my ($self) = @_; + my $impl = $self->{_impl}; + + return length($impl->getName()); +} + +sub new { + my ($class) = @_; + my ($self) = {}; + + # 2 args: either a string address or a cqpid_perl::Address + # 3+ args: name + subject + options + type + if (@_ eq 2) { + my $address = $_[1]; + + if (ref($address) eq 'cqpid_perl::Address') { + $self->{_impl} = $address; + } else { + $self->{_impl} = new cqpid_perl::Address($_[1]); + } + } elsif (@_ >= 4) { + my $impl = new cqpid_perl::Address($_[1], $_[2], $_[3]); + + $impl->setType($_[4]) if @_ >= 5; + + $self->{_impl} = $impl; + } else { + die "You must specify an address." + } + + bless $self, $class; + return $self; +} + +sub get_implementation { + my ($self) = @_; + return $self->{_impl}; +} + +sub set_name { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setName($_[1]); +} + +sub get_name { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getName(); +} + +sub set_subject { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setSubject($_[1]); +} + +sub get_subject { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getSubject; +} + +sub set_options { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setOptions($_[1]); +} + +sub get_options { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getOptions; +} + +sub set_type { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setType($_[1]); +} + +sub get_type { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getType; +} + + + +package qpid::messaging::Duration; + +sub new { + my ($class) = @_; + my ($self) = { + _impl => new cqpid_perl::Duration($_[1]), + }; + + bless $self, $class; + return $self; +} + +sub get_milliseconds { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getMilliseconds(); +} + +sub get_implementation { + my ($self) = @_; + + return $self->{_impl}; +} + +# TODO: Need a better way to define FOREVER +use constant { + FOREVER => new qpid::messaging::Duration(10000), + IMMEDIATE => new qpid::messaging::Duration(0), + SECOND => new qpid::messaging::Duration(1000), + MINUTE => new qpid::messaging::Duration(60000), +}; + + + +package qpid::messaging::Message; + +sub new { + my ($class) = @_; + my $content = $_[1] if (@_ > 1); + my $impl = $_[2] if (@_ > 2); + my ($self) = { + _content => $content || "", + _impl => $impl || undef, + }; + + unless (defined($self->{_impl})) { + my $impl = new cqpid_perl::Message($self->{_content}); + + $self->{_impl} = $impl; + } + + bless $self, $class; + return $self; +} + +sub get_implementation { + my ($self) = @_; + + return $self->{_impl}; +} + +sub set_reply_to { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setReplyTo($_[1]->get_implementation()); +} + +sub get_reply_to { + my ($self) = @_; + my $impl = $self->{_impl}; + + return new qpid::messaging::Address($impl->getReplyTo()); +} + +sub set_subject { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setSubject($_[1]); +} + +sub get_subject { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getSubject; +} + +sub set_content_type { + my ($self) = @_; + my $type = $_[1]; + + my $impl = $self->{_impl}; + $impl->setContentType($type); +} + +sub get_content_type { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getContentType; +} + +sub set_message_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setMessageId($_[1]); +} + +sub get_message_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getMessageId; +} + +sub set_user_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setUserId($_[1]); +} + +sub get_user_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getUserId; +} + +sub set_correlation_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setCorrelationId($_[1]); +} + +sub get_correlation_id { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getCorrelationId; +} + +sub set_priority { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setPriority($_[1]); +} + +sub get_priority { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getPriority; +} + +sub set_ttl { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setTtl($_[1]); +} + +sub get_ttl { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getTtl; +} + +sub set_durable { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setDurable($_[1]); +} + +sub get_durable { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getDurable; +} + +sub set_redelivered { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setRedelivered($_[1]); +} + +sub get_redelivered { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getRedelivered; +} + +sub get_property { + my ($self) = @_; + my $key = $_[1]; + + my $impl = $self->{_impl}; + + return $impl->getPropert($key); +} + +sub get_properties { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getProperties; +} + +sub set_content { + my ($self) = @_; + my $content = $_[1] || ""; + my $impl = $self->{_impl}; + + $impl->setContent($content); +} + +sub get_content { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getContent(); +} + +sub get_content_size { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getContentSize; +} + + + +package qpid::messaging::Sender; + +sub new { + my ($class) = @_; + my ($self) = { + _impl => $_[1], + _session => $_[2], + }; + + die "Must provide an implementation." unless defined($self->{_impl}); + die "Must provide a Session." unless defined($self->{_session}); + + bless $self, $class; + return $self; +} + +sub send { + my ($self) = @_; + my $message = $_[1]; + my $sync = $_[2] || 0; + + die "No message to send." unless defined($message); + + my $impl = $self->{_impl}; + + $impl->send($message->get_implementation, $sync); +} + +sub close { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->close; +} + +sub set_capacity { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->setCapacity($_[1]); +} + +sub get_capacity { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getCapacity; +} + +sub get_unsettled { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getUnsettled; +} + +sub get_available { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getAvailable(); +} + +sub get_name { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getName; +} + +sub get_session { + my ($self) = @_; + + return $self->{_session}; +} + + + +package qpid::messaging::Receiver; + +sub new { + my ($class) = @_; + my ($self) = { + _impl => $_[1], + _session => $_[2], + }; + + die "Must provide an implementation." unless defined($self->{_impl}); + die "Must provide a Session." unless defined($self->{_session}); + + bless $self, $class; + return $self; +} + +sub get { + my ($self) = @_; + my $duration = $_[1]; + my $impl = $self->{_impl}; + + $duration = $duration->get_implementation() if defined($duration); + + my $message = undef; + + if (defined($duration)) { + $message = $impl->get($duration); + } else { + $message = $impl->get; + } +} + +sub fetch { + my ($self) = @_; + my $duration = $_[1]; + my $impl = $self->{_impl}; + my $message = undef; + + if (defined($duration)) { + $message = $impl->fetch($duration->get_implementation()); + } else { + $message = $impl->fetch; + } + + return new qpid::messaging::Message("", $message); +} + +sub set_capacity { + my ($self) = @_; + my $capacity = $_[1]; + my $impl = $self->{_impl}; + + $impl->setCapacity($capacity); +} + +sub get_capacity { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getCapacity; +} + +sub get_available { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getAvailable; +} + +sub get_unsettled { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getUnsettled; +} + +sub close { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->close; +} + +sub is_closed { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->isClosed; +} + +sub get_name { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getName; +} + +sub get_session { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->{_session}; +} + + + +package qpid::messaging::Session; + +sub new { + my ($class) = @_; + my ($self) = { + _impl => $_[1], + _conn => $_[2], + }; + + die "Must provide an implementation." unless defined($self->{_impl}); + die "Must provide a Connection." unless defined($self->{_conn}); + + bless $self, $class; + return $self; +} + +sub close { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->close; +} + +sub commit { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->commit; +} + +sub rollback { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->rollback; +} + +# TODO how to handle acknowledging a specific message +sub acknowledge { + my ($self) = @_; + my $sync = $_[1] || 0; + + my $impl = $self->{_impl}; + + $impl->acknowledge($sync); +} + +sub acknowledge_up_to { +} + +sub reject { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->reject($_[1]); +} + +sub release { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->release($_[1]); +} + +sub sync { + my ($self) = @_; + my $impl = $self->{_impl}; + + if(defined($_[1])) { + $impl->sync($_[1]); + } else { + $impl->sync; + } +} + +sub get_receivable { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getReceivable; +} + +sub get_unsettled_acks { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getUnsettledAcks; +} + +sub get_next_receiver { + my ($self) = @_; + my $impl = $self->{_impl}; + + my $timeout = $_[1] || qpid::messaging::Duration::FOREVER; + + return $impl->getNextReceiver($timeout); +} + +sub create_sender { + my ($self) = @_; + my $impl = $self->{_impl}; + + my $address = $_[1]; + + if (ref($address) eq "qpid::messaging::Address") { + my $temp = $address->get_implementation(); + $address = $temp; + } + my $send_impl = $impl->createSender($address); + + return new qpid::messaging::Sender($send_impl, $self); +} + +sub create_receiver { + my ($self) = @_; + my $impl = $self->{_impl}; + + my $address = $_[1]; + + if (ref($address) eq "qpid::messaging::Address") { + $address = $address->get_implementation(); + } + my $recv_impl = $impl->createReceiver($address); + + return new qpid::messaging::Receiver($recv_impl, $self); +} + +sub get_sender { + my ($self) = @_; + my $impl = $self->{_impl}; + + my $send_impl = $impl->getSender($_[1]); + my $sender = undef; + + if (defined($send_impl)) { + $sender = new qpid::messaging::Sender($send_impl, $self); + } + + return $sender; +} + +sub get_receiver { + my ($self) = @_; + my $impl = $self->{_impl}; + + my $recv_impl = $impl->getReceiver($_[1]); + my $receiver = undef; + + if (defined($recv_impl)) { + $receiver = new qpid::messaging::Receiver($recv_impl, $self); + } + + return $receiver; +} + +sub get_connection { + my ($self) = @_; + + return $self->{_conn}; +} + +sub has_error { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->hasError; +} + +sub check_for_error { + my ($self) = @_; + my $impl = $self->{_impl}; + + $impl->checkForError; +} + + + +package qpid::messaging::Connection; + +sub new { + my ($class) = @_; + my $self = { + _url => $_[1] || "localhost:5672", + _options => $_[2] || {}, + _impl => $_[3], + }; + + bless $self, $class; + return $self; +} + +sub open { + my ($self) = @_; + my $impl = $self->{_impl}; + + # if we have an implementation instance then use it, otherwise + # create a new implementation instance + unless (defined($impl)) { + my $url = $self->{_url}; + my ($options) = $self->{_options}; + + $impl = new cqpid_perl::Connection($url, $options); + $self->{_impl} = $impl + } + + $impl->open() unless $impl->isOpen() +} + +sub is_open { + my ($self) = @_; + my $impl = $self->{_impl}; + + if (defined($impl) && $impl->isOpen()) { + 1; + } else { + 0; + } +} + +sub close { + my ($self) = @_; + + if ($self->is_open) { + my $impl = $self->{_impl}; + + $impl->close; + $self->{_impl} = undef; + } +} + +sub create_session { + my ($self) = @_; + + die "No connection available." unless ($self->open); + + my $impl = $self->{_impl}; + my $name = $_[1] || ""; + my $session = $impl->createSession($name); + + return new qpid::messaging::Session($session, $self); +} + +sub create_transactional_session { + my ($self) = @_; + + die "No connection available." unless ($self->open); + + my $impl = $self->{_impl}; + my $name = $_[1] || ""; + my $session = $impl->createTransactionalSession($name); + + return new qpid::messaging::Session($session, $self); +} + +sub get_session { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getSession($_[1]); +} + +sub get_authenticated_username { + my ($self) = @_; + my $impl = $self->{_impl}; + + return $impl->getAuthenticatedUsername; +} + +1; diff --git a/qpid/cpp/bindings/qpid/python/CMakeLists.txt b/qpid/cpp/bindings/qpid/python/CMakeLists.txt index 4cbdc99245..2693475dea 100644 --- a/qpid/cpp/bindings/qpid/python/CMakeLists.txt +++ b/qpid/cpp/bindings/qpid/python/CMakeLists.txt @@ -21,12 +21,16 @@ ## Use Swig to generate a literal binding to the C++ API ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES CPLUSPLUS ON) -set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/python.i + PROPERTIES SWIG_FLAGS "-I${qpid-cpp_SOURCE_DIR}/include;-I${qpid-cpp_SOURCE_DIR}/bindings") swig_add_module(cqpid_python python ${CMAKE_CURRENT_SOURCE_DIR}/python.i) swig_link_libraries(cqpid_python qpidmessaging qpidtypes qmf2 ${PYTHON_LIBRARIES}) -set_source_files_properties(${swig_generated_file_fullname} PROPERTIES COMPILE_FLAGS "-fno-strict-aliasing -I${PYTHON_INCLUDE_PATH} -I${qpid-cpp_SOURCE_DIR}/include") +set_source_files_properties(${swig_generated_file_fullname} PROPERTIES COMPILE_FLAGS "-fno-strict-aliasing") +include_directories(${PYTHON_INCLUDE_PATH} + ${qpid-cpp_SOURCE_DIR}/include + ${qpid-cpp_SOURCE_DIR}/bindings) ##------------------------------------ ## Install the complete Python binding diff --git a/qpid/cpp/bindings/qpid/python/Makefile.am b/qpid/cpp/bindings/qpid/python/Makefile.am index 432fe7e764..d27cc8b3a2 100644 --- a/qpid/cpp/bindings/qpid/python/Makefile.am +++ b/qpid/cpp/bindings/qpid/python/Makefile.am @@ -19,7 +19,7 @@ if HAVE_PYTHON_DEVEL -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src/qmf -I$(top_srcdir)/src -I$(top_builddir)/src +INCLUDES = -I$(top_srcdir)/include -I$(top_srcdir)/bindings -I$(top_builddir)/include -I$(top_srcdir)/src/qmf -I$(top_srcdir)/src -I$(top_builddir)/src generated_file_list = \ cqpid.cpp \ @@ -29,7 +29,7 @@ EXTRA_DIST = CMakeLists.txt python.i BUILT_SOURCES = $(generated_file_list) SWIG_FLAGS = -w362,401 -$(generated_file_list): $(srcdir)/python.i $(srcdir)/../qpid.i $(srcdir)/../../swig_python_typemaps.i +$(generated_file_list): $(srcdir)/python.i $(SWIG) -c++ -python $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I$(top_srcdir)/src/qmf -I/usr/include -o cqpid.cpp $(srcdir)/python.i pylibdir = $(pyexecdir) diff --git a/qpid/cpp/bindings/qpid/python/python.i b/qpid/cpp/bindings/qpid/python/python.i index 4d8a64b376..b0c990cc8b 100644 --- a/qpid/cpp/bindings/qpid/python/python.i +++ b/qpid/cpp/bindings/qpid/python/python.i @@ -19,7 +19,7 @@ %module cqpid %include "std_string.i" -%include "../../swig_python_typemaps.i" +%include "swig_python_typemaps.i" /* Needed for get/setPriority methods. Surprising SWIG 1.3.40 doesn't * convert uint8_t by default. */ @@ -159,7 +159,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) %rename(_setTtl) qpid::messaging::Message::setTtl; -%include "../qpid.i" +%include "qpid/qpid.i" %extend qpid::messaging::Connection { %pythoncode %{ diff --git a/qpid/cpp/bindings/qpid/ruby/CMakeLists.txt b/qpid/cpp/bindings/qpid/ruby/CMakeLists.txt index 17c44c0f46..564f5655c8 100644 --- a/qpid/cpp/bindings/qpid/ruby/CMakeLists.txt +++ b/qpid/cpp/bindings/qpid/ruby/CMakeLists.txt @@ -31,7 +31,9 @@ set(GEM_OUTPUT_FILE ${GEM_OUTPUT_PATH}/pkg/qpid-${qpidc_version}.0.gem) ##------------------------------------------------------ set_source_files_properties(${CMAKE_CURRENT_SOURCE_DIR}/ruby.i PROPERTIES CPLUSPLUS ON) -include_directories(${RUBY_INCLUDE_DIRS} ${qpid-cpp_SOURCE_DIR}/include) +include_directories(${RUBY_INCLUDE_DIRS} + ${qpid-cpp_SOURCE_DIR}/include + ${qpid-cpp_SOURCE_DIR}/bindings) swig_add_module(cqpid_ruby ruby ${CMAKE_CURRENT_SOURCE_DIR}/ruby.i) swig_link_libraries(cqpid_ruby qpidmessaging qpidtypes qmf2 ${RUBY_LIBRARY}) diff --git a/qpid/cpp/bindings/qpid/ruby/Makefile.am b/qpid/cpp/bindings/qpid/ruby/Makefile.am index a2a5dd76bd..398449c7ed 100644 --- a/qpid/cpp/bindings/qpid/ruby/Makefile.am +++ b/qpid/cpp/bindings/qpid/ruby/Makefile.am @@ -19,7 +19,7 @@ if HAVE_RUBY_DEVEL -INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src +INCLUDES = -I$(top_srcdir)/include -I$(top_srcdir)/bindings -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src EXTRA_DIST = CMakeLists.txt ruby.i BUILT_SOURCES = cqpid.cpp @@ -27,7 +27,7 @@ SWIG_FLAGS = -w362,401 rubylibdir = $(RUBY_LIB) -cqpid.cpp: $(srcdir)/ruby.i $(srcdir)/../qpid.i $(srcdir)/../../swig_ruby_typemaps.i +cqpid.cpp: $(srcdir)/ruby.i $(SWIG) -ruby -c++ $(SWIG_FLAGS) $(INCLUDES) $(QPID_CXXFLAGS) -I/usr/include -o cqpid.cpp $(srcdir)/ruby.i rubylibarchdir = $(RUBY_LIB_ARCH) diff --git a/qpid/cpp/bindings/qpid/ruby/README.rdoc b/qpid/cpp/bindings/qpid/ruby/README.rdoc index 5d12b89880..90a2fdcf7a 100644 --- a/qpid/cpp/bindings/qpid/ruby/README.rdoc +++ b/qpid/cpp/bindings/qpid/ruby/README.rdoc @@ -3,7 +3,7 @@ Qpid is an cross-platform enterprise messaging system based on the open-source AMQP protocol. -Version :: 0.19.0 +Version :: 0.21.0 = Links diff --git a/qpid/cpp/bindings/qpid/ruby/ruby.i b/qpid/cpp/bindings/qpid/ruby/ruby.i index 13189c93f2..642deb2bcd 100644 --- a/qpid/cpp/bindings/qpid/ruby/ruby.i +++ b/qpid/cpp/bindings/qpid/ruby/ruby.i @@ -21,7 +21,7 @@ /* Ruby doesn't have a != operator*/ #pragma SWIG nowarn=378 %include "std_string.i" -%include "../../swig_ruby_typemaps.i" +%include "swig_ruby_typemaps.i" /* Define the general-purpose exception handling */ %exception { @@ -34,5 +34,5 @@ } } -%include "../qpid.i" +%include "qpid/qpid.i" diff --git a/qpid/cpp/etc/CMakeLists.txt b/qpid/cpp/etc/CMakeLists.txt index 014842c9c7..d9266537b0 100644 --- a/qpid/cpp/etc/CMakeLists.txt +++ b/qpid/cpp/etc/CMakeLists.txt @@ -23,6 +23,11 @@ install(FILES qpidc.conf install(FILES qpidd.conf DESTINATION ${QPID_INSTALL_CONFDIR} COMPONENT ${QPID_COMPONENT_BROKER}) +if (UNIX) + install(FILES qpidd.service + DESTINATION ${QPID_INSTALL_SYSTEMDDIR} + COMPONENT ${QPID_COMPONENT_BROKER}) +endif (UNIX) if (BUILD_SASL) install(FILES sasl2/qpidd.conf DESTINATION ${QPID_INSTALL_SASLDIR} diff --git a/qpid/cpp/etc/Makefile.am b/qpid/cpp/etc/Makefile.am index 7af495f089..80c5fc51eb 100644 --- a/qpid/cpp/etc/Makefile.am +++ b/qpid/cpp/etc/Makefile.am @@ -20,7 +20,7 @@ SASL_CONF = sasl2/qpidd.conf EXTRA_DIST = \ $(SASL_CONF) \ - qpidd.service.in qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \ + qpidd.service qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \ cluster.conf-example.xml.in confdir = $(sysconfdir)/qpid @@ -47,11 +47,11 @@ qpidd-primary: qpidd-primary.in sed $(SUBST) $< > $@ cluster.conf-example.xml: cluster.conf-example.xml.in sed $(SUBST) $< > $@ -qpidd.service: qpidd.service.in - sed $(SUBST) $< > $@ -CLEANFILES = qpidd qpidd-primary cluster.conf-example.xml qpidd.service +CLEANFILES = qpidd qpidd-primary cluster.conf-example.xml initddir = $(sysconfdir)/init.d -nobase_initd_SCRIPTS = qpidd qpidd-primary qpidd.service +nobase_initd_SCRIPTS = qpidd qpidd-primary +systemddir = /usr/lib/systemd/system +nobase_systemd_SCRIPTS = qpidd.service diff --git a/qpid/cpp/etc/qpidd.service.in b/qpid/cpp/etc/qpidd.service index a6549834f4..a6549834f4 100644 --- a/qpid/cpp/etc/qpidd.service.in +++ b/qpid/cpp/etc/qpidd.service diff --git a/qpid/cpp/examples/messaging/Makefile.am b/qpid/cpp/examples/messaging/Makefile.am index 298d65e6f1..d5303f4437 100644 --- a/qpid/cpp/examples/messaging/Makefile.am +++ b/qpid/cpp/examples/messaging/Makefile.am @@ -33,10 +33,10 @@ hello_xml_SOURCES=hello_xml.cpp hello_xml_LDADD=$(CLIENT_LIB) drain_SOURCES=drain.cpp OptionParser.h OptionParser.cpp -drain_LDADD=$(CLIENT_LIB) +drain_LDADD=$(CLIENT_LIB) -lqpidtypes spout_SOURCES=spout.cpp OptionParser.h OptionParser.cpp -spout_LDADD=$(CLIENT_LIB) +spout_LDADD=$(CLIENT_LIB) -lqpidtypes client_SOURCES=client.cpp client_LDADD=$(CLIENT_LIB) @@ -45,10 +45,10 @@ server_SOURCES=server.cpp server_LDADD=$(CLIENT_LIB) map_sender_SOURCES=map_sender.cpp -map_sender_LDADD=$(CLIENT_LIB) +map_sender_LDADD=$(CLIENT_LIB) -lqpidtypes map_receiver_SOURCES=map_receiver.cpp -map_receiver_LDADD=$(CLIENT_LIB) +map_receiver_LDADD=$(CLIENT_LIB) -lqpidtypes examples_DATA= \ hello_world.cpp \ diff --git a/qpid/cpp/examples/messaging/spout.cpp b/qpid/cpp/examples/messaging/spout.cpp index cd11a7ad81..72fcdc7c65 100644 --- a/qpid/cpp/examples/messaging/spout.cpp +++ b/qpid/cpp/examples/messaging/spout.cpp @@ -91,6 +91,7 @@ struct Options : OptionParser std::string value; if (nameval(property, name, value)) { message.getProperties()[name] = value; + message.getProperties()[name].setEncoding("utf8"); } else { message.getProperties()[name] = Variant(); } diff --git a/qpid/cpp/examples/old_api/direct/Makefile.am b/qpid/cpp/examples/old_api/direct/Makefile.am index 24f783fcc7..09709c2bf4 100644 --- a/qpid/cpp/examples/old_api/direct/Makefile.am +++ b/qpid/cpp/examples/old_api/direct/Makefile.am @@ -23,13 +23,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=direct_producer listener declare_queues direct_producer_SOURCES=direct_producer.cpp -direct_producer_LDADD=$(CLIENT_LIB) +direct_producer_LDADD=$(CLIENT_LIB) -lqpidcommon listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) +listener_LDADD=$(CLIENT_LIB) -lqpidcommon declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) +declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ direct_producer.cpp \ diff --git a/qpid/cpp/examples/old_api/failover/Makefile.am b/qpid/cpp/examples/old_api/failover/Makefile.am index 8b1da80f2c..516c3625c1 100644 --- a/qpid/cpp/examples/old_api/failover/Makefile.am +++ b/qpid/cpp/examples/old_api/failover/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) +declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon resuming_receiver_SOURCES=resuming_receiver.cpp -resuming_receiver_LDADD=$(CLIENT_LIB) +resuming_receiver_LDADD=$(CLIENT_LIB) -lqpidcommon replaying_sender_SOURCES=replaying_sender.cpp -replaying_sender_LDADD=$(CLIENT_LIB) +replaying_sender_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ declare_queues.cpp \ diff --git a/qpid/cpp/examples/old_api/fanout/Makefile.am b/qpid/cpp/examples/old_api/fanout/Makefile.am index 3ab43b0279..797312a72d 100644 --- a/qpid/cpp/examples/old_api/fanout/Makefile.am +++ b/qpid/cpp/examples/old_api/fanout/Makefile.am @@ -26,7 +26,7 @@ fanout_producer_SOURCES=fanout_producer.cpp fanout_producer_LDADD=$(CLIENT_LIB) listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) +listener_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ fanout_producer.cpp \ diff --git a/qpid/cpp/examples/old_api/pub-sub/Makefile.am b/qpid/cpp/examples/old_api/pub-sub/Makefile.am index 8f42ee0211..fc61236475 100644 --- a/qpid/cpp/examples/old_api/pub-sub/Makefile.am +++ b/qpid/cpp/examples/old_api/pub-sub/Makefile.am @@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher topic_listener_SOURCES=topic_listener.cpp -topic_listener_LDADD=$(CLIENT_LIB) +topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon topic_publisher_SOURCES=topic_publisher.cpp -topic_publisher_LDADD=$(CLIENT_LIB) +topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ topic_listener.cpp \ diff --git a/qpid/cpp/examples/old_api/request-response/Makefile.am b/qpid/cpp/examples/old_api/request-response/Makefile.am index f48762da51..92f5bc6558 100644 --- a/qpid/cpp/examples/old_api/request-response/Makefile.am +++ b/qpid/cpp/examples/old_api/request-response/Makefile.am @@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=client server client_SOURCES=client.cpp -client_LDADD=$(CLIENT_LIB) +client_LDADD=$(CLIENT_LIB) -lqpidcommon server_SOURCES=server.cpp -server_LDADD=$(CLIENT_LIB) +server_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ server.cpp \ diff --git a/qpid/cpp/examples/old_api/tradedemo/Makefile.am b/qpid/cpp/examples/old_api/tradedemo/Makefile.am index 445b15b367..a05bbc3780 100644 --- a/qpid/cpp/examples/old_api/tradedemo/Makefile.am +++ b/qpid/cpp/examples/old_api/tradedemo/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher declare_queues topic_listener_SOURCES=topic_listener.cpp -topic_listener_LDADD=$(CLIENT_LIB) +topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon topic_publisher_SOURCES=topic_publisher.cpp -topic_publisher_LDADD=$(CLIENT_LIB) +topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) +declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon examples_DATA= \ diff --git a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am index 3e1082cdb2..9391806849 100644 --- a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am +++ b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues xml_producer listener declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) +declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon xml_producer_SOURCES=xml_producer.cpp -xml_producer_LDADD=$(CLIENT_LIB) +xml_producer_LDADD=$(CLIENT_LIB) -lqpidcommon listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) +listener_LDADD=$(CLIENT_LIB) -lqpidcommon EXTRA_DIST= \ README.txt \ diff --git a/qpid/cpp/examples/qmf-console/Makefile.am b/qpid/cpp/examples/qmf-console/Makefile.am index f4cbb7633c..060147e9a4 100644 --- a/qpid/cpp/examples/qmf-console/Makefile.am +++ b/qpid/cpp/examples/qmf-console/Makefile.am @@ -25,19 +25,19 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=console printevents ping queuestats cluster-qmon console_SOURCES=console.cpp -console_LDADD=$(CONSOLE_LIB) +console_LDADD=$(CONSOLE_LIB) -lqpidcommon -lqpidclient printevents_SOURCES=printevents.cpp -printevents_LDADD=$(CONSOLE_LIB) +printevents_LDADD=$(CONSOLE_LIB) -lqpidcommon -lqpidclient ping_SOURCES=ping.cpp -ping_LDADD=$(CONSOLE_LIB) +ping_LDADD=$(CONSOLE_LIB) -lqpidcommon -lqpidclient queuestats_SOURCES=queuestats.cpp -queuestats_LDADD=$(CONSOLE_LIB) +queuestats_LDADD=$(CONSOLE_LIB) -lqpidcommon -lqpidclient cluster_qmon_SOURCES=cluster-qmon.cpp -cluster_qmon_LDADD=$(CONSOLE_LIB) +cluster_qmon_LDADD=$(CONSOLE_LIB) -lqpidcommon -lqpidclient examples_DATA= \ console.cpp \ diff --git a/qpid/cpp/bindings/qmf2/qmf2.i b/qpid/cpp/include/qmf/qmf2.i index 0f573fe3e6..0f573fe3e6 100644 --- a/qpid/cpp/bindings/qmf2/qmf2.i +++ b/qpid/cpp/include/qmf/qmf2.i diff --git a/qpid/cpp/bindings/qmf/qmfengine.i b/qpid/cpp/include/qmf/qmfengine.i index eb350115a3..eb350115a3 100644 --- a/qpid/cpp/bindings/qmf/qmfengine.i +++ b/qpid/cpp/include/qmf/qmfengine.i diff --git a/qpid/cpp/include/qpid/management/Manageable.h b/qpid/cpp/include/qpid/management/Manageable.h index fd1e604f58..e72dc0b332 100644 --- a/qpid/cpp/include/qpid/management/Manageable.h +++ b/qpid/cpp/include/qpid/management/Manageable.h @@ -55,7 +55,8 @@ class QPID_COMMON_EXTERN Manageable // // This accessor function returns a pointer to the management object. // - virtual ManagementObject::shared_ptr GetManagementObject(void) const = 0; + virtual ManagementObject* GetManagementObject() const; + virtual ManagementObject::shared_ptr GetManagementObjectShared() const; // Every "Manageable" object must implement ManagementMethod. This // function is called when a remote management client invokes a method diff --git a/qpid/cpp/bindings/qpid/qpid.i b/qpid/cpp/include/qpid/qpid.i index 352bafa3c8..352bafa3c8 100644 --- a/qpid/cpp/bindings/qpid/qpid.i +++ b/qpid/cpp/include/qpid/qpid.i diff --git a/qpid/cpp/managementgen/qmf-gen b/qpid/cpp/managementgen/qmf-gen index 2f0cc0d8fd..fc2f284578 100755 --- a/qpid/cpp/managementgen/qmf-gen +++ b/qpid/cpp/managementgen/qmf-gen @@ -68,9 +68,11 @@ vargs = {} if opts.brokerplugin: vargs["agentHeaderDir"] = "management" vargs["genQmfV1"] = True + vargs["genForBroker"] = True else: vargs["agentHeaderDir"] = "agent" vargs["genQmfV1"] = None + vargs["genForBroker"] = None if opts.qpidlogs: vargs["genLogs"] = True diff --git a/qpid/cpp/managementgen/qmfgen/generate.py b/qpid/cpp/managementgen/qmfgen/generate.py index 61111be01d..a7ad43cc30 100755 --- a/qpid/cpp/managementgen/qmfgen/generate.py +++ b/qpid/cpp/managementgen/qmfgen/generate.py @@ -307,11 +307,22 @@ class Generator: def testGenLogs (self, variables): return variables["genLogs"] + def testInBroker (self, variables): + return variables['genForBroker'] + def genDisclaimer (self, stream, variables): prefix = variables["commentPrefix"] stream.write (prefix + " This source file was created by a code generator.\n") stream.write (prefix + " Please do not edit.") + def genExternClass (self, stream, variables): + if variables['genForBroker']: + stream.write("QPID_BROKER_CLASS_EXTERN") + + def genExternMethod (self, stream, variables): + if variables['genForBroker']: + stream.write("QPID_BROKER_EXTERN") + def fileExt (self, path): dot = path.rfind (".") if dot == -1: diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index dc8ffae446..cfbc88f7a9 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1476,8 +1476,11 @@ class SchemaClass: def genMethodIdDeclarations (self, stream, variables): number = 1 + ext = "" + if variables['genForBroker']: + ext = "QPID_BROKER_EXTERN " for method in self.methods: - stream.write (" QPID_BROKER_EXTERN static const uint32_t METHOD_" + method.getName().upper() +\ + stream.write (" " + ext + "static const uint32_t METHOD_" + method.getName().upper() +\ " = %d;\n" % number) number = number + 1 @@ -1520,8 +1523,12 @@ class SchemaClass: def genParentRefAssignment (self, stream, variables): for config in self.properties: if config.isParentRef == 1: - stream.write (config.getName () + \ - " = _parent->GetManagementObject ()->getObjectId ();") + if variables['genForBroker']: + stream.write (config.getName () + \ + " = _parent->GetManagementObjectShared()->getObjectId ();") + else: + stream.write (config.getName () + \ + " = _parent->GetManagementObject()->getObjectId ();") return def genSchemaMD5 (self, stream, variables): diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index bb9af15525..362d268aba 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -24,7 +24,9 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementObject.h" +/*MGEN:IF(Root.InBroker)*/ #include "qmf/BrokerImportExport.h" +/*MGEN:ENDIF*/ #include <limits> namespace qpid { @@ -36,7 +38,7 @@ namespace qpid { namespace qmf { /*MGEN:Class.OpenNamespaces*/ -QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject +/*MGEN:Root.ExternClass*/ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject { private: @@ -79,22 +81,22 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen public: typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; - QPID_BROKER_EXTERN static void writeSchema(std::string& schema); - QPID_BROKER_EXTERN void mapEncodeValues(::qpid::types::Variant::Map& map, + /*MGEN:Root.ExternMethod*/ static void writeSchema(std::string& schema); + /*MGEN:Root.ExternMethod*/ void mapEncodeValues(::qpid::types::Variant::Map& map, bool includeProperties=true, bool includeStatistics=true); - QPID_BROKER_EXTERN void mapDecodeValues(const ::qpid::types::Variant::Map& map); - QPID_BROKER_EXTERN void doMethod(std::string& methodName, + /*MGEN:Root.ExternMethod*/ void mapDecodeValues(const ::qpid::types::Variant::Map& map); + /*MGEN:Root.ExternMethod*/ void doMethod(std::string& methodName, const ::qpid::types::Variant::Map& inMap, ::qpid::types::Variant::Map& outMap, const std::string& userId); - QPID_BROKER_EXTERN std::string getKey() const; + /*MGEN:Root.ExternMethod*/ std::string getKey() const; /*MGEN:IF(Root.GenQMFv1)*/ - QPID_BROKER_EXTERN uint32_t writePropertiesSize() const; - QPID_BROKER_EXTERN void readProperties(const std::string& buf); - QPID_BROKER_EXTERN void writeProperties(std::string& buf) const; - QPID_BROKER_EXTERN void writeStatistics(std::string& buf, bool skipHeaders = false); - QPID_BROKER_EXTERN void doMethod(std::string& methodName, + /*MGEN:Root.ExternMethod*/ uint32_t writePropertiesSize() const; + /*MGEN:Root.ExternMethod*/ void readProperties(const std::string& buf); + /*MGEN:Root.ExternMethod*/ void writeProperties(std::string& buf) const; + /*MGEN:Root.ExternMethod*/ void writeStatistics(std::string& buf, bool skipHeaders = false); + /*MGEN:Root.ExternMethod*/ void doMethod(std::string& methodName, const std::string& inBuf, std::string& outBuf, const std::string& userId); @@ -107,15 +109,15 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::managemen bool hasInst() { return false; } /*MGEN:ENDIF*/ - QPID_BROKER_EXTERN /*MGEN:Class.NameCap*/( + /*MGEN:Root.ExternMethod*/ /*MGEN:Class.NameCap*/( ::qpid::management::ManagementAgent* agent, ::qpid::management::Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); - QPID_BROKER_EXTERN ~/*MGEN:Class.NameCap*/(); + /*MGEN:Root.ExternMethod*/ ~/*MGEN:Class.NameCap*/(); /*MGEN:Class.SetGeneralReferenceDeclaration*/ - QPID_BROKER_EXTERN static void registerSelf( + /*MGEN:Root.ExternMethod*/ static void registerSelf( ::qpid::management::ManagementAgent* agent); std::string& getPackageName() const { return packageName; } diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h index c1d6136a34..e5f5f53c1f 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h @@ -24,36 +24,38 @@ /*MGEN:Root.Disclaimer*/ #include "qpid/management/ManagementEvent.h" +/*MGEN:IF(Root.InBroker)*/ #include "qmf/BrokerImportExport.h" +/*MGEN:ENDIF*/ namespace qmf { /*MGEN:Event.OpenNamespaces*/ -QPID_BROKER_CLASS_EXTERN class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent +/*MGEN:Root.ExternClass*/ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent { private: static void writeSchema (std::string& schema); static uint8_t md5Sum[MD5_LEN]; - QPID_BROKER_EXTERN static std::string packageName; - QPID_BROKER_EXTERN static std::string eventName; + /*MGEN:Root.ExternMethod*/ static std::string packageName; + /*MGEN:Root.ExternMethod*/ static std::string eventName; /*MGEN:Event.ArgDeclarations*/ public: writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; } - QPID_BROKER_EXTERN Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/); - QPID_BROKER_EXTERN ~Event/*MGEN:Event.NameCap*/() {}; + /*MGEN:Root.ExternMethod*/ Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/); + /*MGEN:Root.ExternMethod*/ ~Event/*MGEN:Event.NameCap*/() {}; static void registerSelf(::qpid::management::ManagementAgent* agent); std::string& getPackageName() const { return packageName; } std::string& getEventName() const { return eventName; } uint8_t* getMd5Sum() const { return md5Sum; } uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } - QPID_BROKER_EXTERN void encode(std::string& buffer) const; - QPID_BROKER_EXTERN void mapEncode(::qpid::types::Variant::Map& map) const; + /*MGEN:Root.ExternMethod*/ void encode(std::string& buffer) const; + /*MGEN:Root.ExternMethod*/ void mapEncode(::qpid::types::Variant::Map& map) const; - QPID_BROKER_EXTERN static bool match(const std::string& evt, const std::string& pkg); + /*MGEN:Root.ExternMethod*/ static bool match(const std::string& evt, const std::string& pkg); static std::pair<std::string,std::string> getFullName() { return std::make_pair(packageName, eventName); } diff --git a/qpid/cpp/managementgen/qmfgen/templates/Package.h b/qpid/cpp/managementgen/qmfgen/templates/Package.h index 3a42f12f9d..3260b03cce 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Package.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Package.h @@ -24,7 +24,9 @@ /*MGEN:Root.Disclaimer*/ #include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h" +/*MGEN:IF(Root.InBroker)*/ #include "qmf/BrokerImportExport.h" +/*MGEN:ENDIF*/ namespace qmf { /*MGEN:Class.OpenNamespaces*/ @@ -32,8 +34,8 @@ namespace qmf { class Package { public: - QPID_BROKER_EXTERN Package (::qpid::management::ManagementAgent* agent); - QPID_BROKER_EXTERN ~Package () {} + /*MGEN:Root.ExternMethod*/ Package (::qpid::management::ManagementAgent* agent); + /*MGEN:Root.ExternMethod*/ ~Package () {} }; }/*MGEN:Class.CloseNamespaces*/ diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 05c0ec6302..91ff0621c0 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -146,7 +146,8 @@ libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduleexecdir qpidd_LDADD = \ libqpidbroker.la \ - libqpidcommon.la + libqpidcommon.la \ + -lboost_program_options posix_qpidd_src = posix/QpiddBroker.cpp diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index a681a6d18d..40857f411f 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(options->daemon.transport); ready(port); // Notify parent. if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) { - boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port); } brokerPtr->run(); } @@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) { uint16_t port = brokerPtr->getPort(myOptions->daemon.transport); cout << port << endl; if (options->broker.enableMgmt) { - boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port); } } brokerPtr->run(); diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 61e0b56104..3634c0cdc1 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -317,7 +317,7 @@ Acl::~Acl(){ broker->getConnectionObservers().remove(connectionCounter); } -ManagementObject::shared_ptr Acl::GetManagementObject(void) const +ManagementObject::shared_ptr Acl::GetManagementObjectShared(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index ea3c6586a3..8c1a925713 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -117,7 +117,7 @@ private: bool readAclFile(std::string& aclFile, std::string& errorText); Manageable::status_t lookup (management::Args& args, std::string& text); Manageable::status_t lookupPublish(management::Args& args, std::string& text); - virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const; + virtual qpid::management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 4c14c8e4d9..9c577e6c92 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -540,5 +540,6 @@ CharSequence Decoder::readRawUuid() } size_t Decoder::getPosition() const { return position; } +size_t Decoder::getSize() const { return size; } void Decoder::resetSize(size_t s) { size = s; } }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Decoder.h b/qpid/cpp/src/qpid/amqp/Decoder.h index cf3e2d36d1..7ddfe0f17f 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.h +++ b/qpid/cpp/src/qpid/amqp/Decoder.h @@ -71,6 +71,7 @@ class Decoder QPID_COMMON_EXTERN void advance(size_t); QPID_COMMON_EXTERN size_t getPosition() const; QPID_COMMON_EXTERN void resetSize(size_t size); + QPID_COMMON_EXTERN size_t getSize() const; private: const char* const start; diff --git a/qpid/cpp/src/qpid/amqp/Sasl.cpp b/qpid/cpp/src/qpid/amqp/Sasl.cpp index 6d0a7ccb1f..7b0779fe94 100644 --- a/qpid/cpp/src/qpid/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/amqp/Sasl.cpp @@ -58,29 +58,35 @@ void Sasl::endFrame(void* frame) std::size_t Sasl::read(const char* data, size_t available) { - Decoder decoder(data, available); - //read frame-header - uint32_t frameSize = decoder.readUInt(); - QPID_LOG(trace, "Reading SASL frame of size " << frameSize); - decoder.resetSize(frameSize); - uint8_t dataOffset = decoder.readUByte(); - uint8_t frameType = decoder.readUByte(); - if (frameType != 0x01) { - QPID_LOG(error, "Expected SASL frame; got type " << frameType); - } - uint16_t ignored = decoder.readUShort(); - if (ignored) { - QPID_LOG(info, "Got non null bytes at end of SASL frame header"); - } + size_t consumed = 0; + while (available - consumed > 4/*framesize*/) { + Decoder decoder(data+consumed, available-consumed); + //read frame-header + uint32_t frameSize = decoder.readUInt(); + if (frameSize > decoder.getSize()) break;//don't have all the data for this frame yet + + QPID_LOG(trace, "Reading SASL frame of size " << frameSize); + decoder.resetSize(frameSize); + uint8_t dataOffset = decoder.readUByte(); + uint8_t frameType = decoder.readUByte(); + if (frameType != 0x01) { + QPID_LOG(error, "Expected SASL frame; got type " << frameType); + } + uint16_t ignored = decoder.readUShort(); + if (ignored) { + QPID_LOG(info, "Got non null bytes at end of SASL frame header"); + } - //body is at offset 4*dataOffset from the start - size_t skip = dataOffset*4 - 8; - if (skip) { - QPID_LOG(info, "Offset for sasl frame was not as expected"); - decoder.advance(skip); + //body is at offset 4*dataOffset from the start + size_t skip = dataOffset*4 - 8; + if (skip) { + QPID_LOG(info, "Offset for sasl frame was not as expected"); + decoder.advance(skip); + } + decoder.read(*this); + consumed += decoder.getPosition(); } - decoder.read(*this); - return decoder.getPosition(); + return consumed; } std::size_t Sasl::write(char* data, size_t size) diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 4b4954a0df..90cb1a79ed 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -298,7 +298,7 @@ uint32_t Bridge::encodedSize() const + 2; // sync } -management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const +management::ManagementObject::shared_ptr Bridge::GetManagementObjectShared (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index da397b8f77..9f99c9ce01 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -73,7 +73,7 @@ class Bridge : public PersistableConfig, bool isDetached() const { return detached; } - management::ManagementObject::shared_ptr GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObjectShared() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index cb93abfac7..292820abe4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) : systemObject = System::shared_ptr(system); mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); - mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); + mgmtObject->set_systemRef(system->GetManagementObjectShared()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); mgmtObject->set_connBacklog(conf.connectionBacklog); @@ -454,7 +454,7 @@ Broker::~Broker() { QPID_LOG(notice, "Shut down"); } -ManagementObject::shared_ptr Broker::GetManagementObject(void) const +ManagementObject::shared_ptr Broker::GetManagementObjectShared(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 0a8f406dbf..eecfd3925c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -235,7 +235,7 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared() const; QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 3cb30a82e3..238bb71fb5 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -402,7 +402,7 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObject(void) const +ManagementObject::shared_ptr Connection::GetManagementObjectShared(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 3ef9877750..91470dc3df 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 9098c75f0b..20bd76f645 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -177,7 +177,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared()); } } } @@ -198,7 +198,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared()); } } @@ -227,7 +227,7 @@ void Exchange::setAlternate(Exchange::shared_ptr _alternate) alternate = _alternate; if (mgmtExchange != 0) { if (alternate.get() != 0) - mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); + mgmtExchange->set_altExchange(alternate->GetManagementObjectShared()->getObjectId()); else mgmtExchange->clr_altExchange(); } @@ -294,7 +294,7 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges) } } -ManagementObject::shared_ptr Exchange::GetManagementObject (void) const +ManagementObject::shared_ptr Exchange::GetManagementObjectShared (void) const { return mgmtExchange; } @@ -352,7 +352,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang Exchange::Binding::~Binding () { if (mgmtBinding != 0) { - _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared()); if (mo != 0) mo->dec_bindingCount(); mgmtBinding->resourceDestroy (); @@ -367,7 +367,7 @@ void Exchange::Binding::startManagement() if (broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared()); if (mo != 0) { management::ObjectId queueId = mo->getObjectId(); @@ -383,7 +383,7 @@ void Exchange::Binding::startManagement() } } -ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const +ManagementObject::shared_ptr Exchange::Binding::GetManagementObjectShared () const { return mgmtBinding; } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 70ed393f64..ec9a0bea2f 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -58,7 +58,7 @@ public: framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); void startManagement(); - management::ManagementObject::shared_ptr GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObjectShared() const; }; private: @@ -210,7 +210,7 @@ public: static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; // Federation hooks class DynamicBridge { diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 84a3a9ccb1..db789d79cf 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -292,8 +292,8 @@ void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; - if (!hideManagement() && connection->GetManagementObject()) { - mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); + if (!hideManagement() && connection->GetManagementObjectShared()) { + mgmtObject->set_connectionRef(connection->GetManagementObjectShared()->getObjectId()); } // Get default URL from known-hosts if not already set @@ -669,7 +669,7 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject::shared_ptr Link::GetManagementObject (void) const +ManagementObject::shared_ptr Link::GetManagementObjectShared (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 97511de08f..2087b5259c 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -183,7 +183,7 @@ class Link : public PersistableConfig, public management::Manageable { static bool isEncodedLink(const std::string& key); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject(void) const; + management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); // manage the exchange owned by this link diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp index 90d4d7833f..e236698142 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.cpp +++ b/qpid/cpp/src/qpid/broker/Protocol.cpp @@ -42,6 +42,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolReg for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) { transfer = i->second->translate(m); } + if (!transfer) throw new Exception("Could not convert message into 0-10"); return transfer; } boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index eb72db3a7b..271e8476f9 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -198,7 +198,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); - brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); + brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } @@ -1108,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { - ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); + ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObjectShared(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1154,7 +1154,7 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) alternateExchange = exchange; if (mgmtObject) { if (exchange.get() != 0) - mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); + mgmtObject->set_altExchange(exchange->GetManagementObjectShared()->getObjectId()); else mgmtObject->clr_altExchange(); } @@ -1258,7 +1258,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject::shared_ptr childObj = inst->GetManagementObject(); + ManagementObject::shared_ptr childObj = inst->GetManagementObjectShared(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1306,7 +1306,7 @@ void Queue::countLoadedFromDisk(uint64_t size) const } -ManagementObject::shared_ptr Queue::GetManagementObject (void) const +ManagementObject::shared_ptr Queue::GetManagementObjectShared (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 3a668276e8..25cefd144d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -340,7 +340,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; management::Manageable::status_t QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 944cc7e838..9d6053669b 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); broker = queue->getBroker(); - queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObjectShared()); if (queueMgmtObj) { queueMgmtObj->set_flowStopped(isFlowControlActive()); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 6e42f8d746..0dc8d6cdfe 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -303,14 +303,14 @@ Consumer(_name, type), deliveryCount(0), protocols(parent->getSession().getBroker().getProtocolRegistry()) { - if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) + if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0) { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); if (agent != 0) { - mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), + mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -318,7 +318,7 @@ Consumer(_name, type), } } -ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const { return mgmtObject; } @@ -398,7 +398,8 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) { Credit original = credit; - credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); + credit.consume(1, transfer->getRequiredCredit()); QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << original << " now " << credit); @@ -406,9 +407,10 @@ void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) { - bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); + bool enoughCredit = credit.check(1, transfer->getRequiredCredit()); QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient") - << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: " + << " credit for message of " << transfer->getRequiredCredit() << " bytes: " << credit); return enoughCredit; } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index be7c8d490a..afb527b0f5 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -163,7 +163,7 @@ class SemanticState : private boost::noncopyable { // manageable entry points QPID_BROKER_EXTERN management::ManagementObject::shared_ptr - GetManagementObject(void) const; + GetManagementObjectShared(void) const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 42be45f7ce..f48bf653fb 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -65,7 +65,7 @@ SessionState::SessionState( } void SessionState::addManagementObject() { - if (GetManagementObject()) return; // Already added. + if (GetManagementObjectShared()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); @@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler& h) { if (mgmtObject != 0) { mgmtObject->set_attached (1); - mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } asyncCommandCompleter->attached(); @@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_t credit) { getConnection().outputTasks.giveReadCredit(credit); } -ManagementObject::shared_ptr SessionState::GetManagementObject (void) const +ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index af384ff761..06643fdbef 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState, const qpid::types::Variant::Map& annotations, bool sync); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 591d2a14a6..179a3275a7 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -45,7 +45,7 @@ class System : public management::Manageable System (std::string _dataDir, Broker* broker = 0); - management::ManagementObject::shared_ptr GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index 599b821870..c4b1c280e1 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -40,7 +40,7 @@ class Vhost : public management::Manageable Vhost (management::Manageable* parentBroker, Broker* broker = 0); - management::ManagementObject::shared_ptr GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; } void setFederationTag(const std::string& tag); }; diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 329b4263ee..1f135cf931 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -157,6 +157,7 @@ void Connection::process() QPID_LOG(trace, id << " process()"); if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { QPID_LOG_CAT(debug, model, id << " connection opened"); + pn_connection_set_container(connection, broker.getFederationTag().c_str()); pn_connection_open(connection); } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index 0253ba5552..8daf860f8e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int ssf) connection->set_saslSsf(ssf); } -qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const +qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index e2d0376918..f1514d11c5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -44,7 +44,7 @@ class ManagedConnection : public qpid::management::Manageable, public Connection std::string getUserid() const; void setSaslMechanism(const std::string&); void setSaslSsf(int); - qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; bool isLocal(const ConnectionToken* t) const; void incomingMessageReceived(); void outgoingMessageSent(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index f36a1e8da4..0fe20f68ab 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSessio { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id, + subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id, false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map())); agent->addObject(subscription); subscription->set_creditMode("n/a"); @@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLink() if (subscription != 0) subscription->resourceDestroy(); } -qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const +qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const { return subscription; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h index 20a1095db2..19667da698 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h @@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid::management::Manageable public: ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic); virtual ~ManagedOutgoingLink(); - qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; void outgoingMessageSent(); void outgoingMessageAccepted(); void outgoingMessageRejected(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp index 9bef0e842b..f1c4940118 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& broker, ManagedConnection& p, const std:: session->set_attached(true); session->set_detachedLifespan(0); session->clr_expireTime(); - session->set_connectionRef(parent.GetManagementObject()->getObjectId()); + session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId()); agent->addObject(session); } } @@ -48,7 +48,7 @@ ManagedSession::~ManagedSession() if (session) session->resourceDestroy(); } -qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const +qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const { return session; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h index 1f56964bb6..2f62c8705a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -40,7 +40,7 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke public: ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id); virtual ~ManagedSession(); - qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; bool isLocal(const ConnectionToken* t) const; void incomingMessageReceived(); void incomingMessageAccepted(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index af67f2ce22..a4c346e131 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -94,6 +94,7 @@ Message::Message(size_t size) : data(size) applicationProperties.init(); body.init(); + footer.init(); } char* Message::getData() { return &data[0]; } const char* Message::getData() const { return &data[0]; } @@ -140,6 +141,10 @@ qpid::amqp::CharSequence Message::getBody() const { return body; } +qpid::amqp::CharSequence Message::getFooter() const +{ + return footer; +} void Message::scan() { diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index d4a97c928a..cc3406f72a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -63,6 +63,7 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess qpid::amqp::CharSequence getApplicationProperties() const; qpid::amqp::CharSequence getBareMessage() const; qpid::amqp::CharSequence getBody() const; + qpid::amqp::CharSequence getFooter() const; Message(size_t size); char* getData(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 665bf2def4..9605cacac1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -145,6 +145,7 @@ void Outgoing::detached() bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) { Record& r = deliveries[current++]; + if (current >= deliveries.capacity()) current = 0; r.cursor = cursor; r.msg = msg; pn_delivery(link, r.tag); @@ -161,7 +162,7 @@ void Outgoing::notify() bool Outgoing::accept(const qpid::broker::Message&) { - return canDeliver(); + return true; } void Outgoing::setSubjectFilter(const std::string& f) diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 760fa2d902..fabe609473 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -53,32 +53,33 @@ namespace amqp { class Target { public: + Target(pn_link_t* l) : credit(100), window(0), link(l) {} virtual ~Target() {} - virtual void flow() = 0; + bool flow(); + bool needFlow(); virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message - private: + protected: + const uint32_t credit; + uint32_t window; + pn_link_t* link; }; class Queue : public Target { public: - Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : queue(q), link(l) {} - void flow(); + Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Queue> queue; - pn_link_t* link; }; class Exchange : public Target { public: - Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : exchange(e), link(l) {} - void flow(); + Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; - pn_link_t* link; }; Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) @@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link) if (node.queue) { boost::shared_ptr<Target> q(new Queue(node.queue, link)); targets[link] = q; - q->flow(); } else if (node.exchange) { boost::shared_ptr<Target> e(new Exchange(node.exchange, link)); targets[link] = e; - e->flow(); } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ @@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link, pn_delivery_t* delivery) received->begin(); Transfer t(delivery, shared_from_this()); received->end(t); - target->second->flow(); + if (target->second->needFlow()) out.activateOutput(); } } void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) @@ -283,6 +282,9 @@ bool Session::dispatch() accepted(*i, true); } } + for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) { + if (t->second->flow()) output = true; + } return output; } @@ -299,24 +301,32 @@ void Session::close() deleted = true; } -void Queue::flow() +void Queue::handle(qpid::broker::Message& message) { - pn_link_flow(link, 1);//TODO: proper flow control + queue->deliver(message); + --window; } -void Queue::handle(qpid::broker::Message& message) +void Exchange::handle(qpid::broker::Message& message) { - queue->deliver(message); + DeliverableMessage deliverable(message, 0); + exchange->route(deliverable); + --window; } -void Exchange::flow() +bool Target::flow() { - pn_link_flow(link, 1);//TODO: proper flow control + bool issue = window < credit; + if (issue) { + pn_link_flow(link, credit - window);//TODO: proper flow control + window = credit; + } + return issue; } -void Exchange::handle(qpid::broker::Message& message) +bool Target::needFlow() { - DeliverableMessage deliverable(message, 0); - exchange->route(deliverable); + return window <= (credit/2); } + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 551b4182e0..ca2094b965 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -215,6 +215,9 @@ void Translation::write(Outgoing& out) //write bare message qpid::amqp::CharSequence bareMessage = message->getBareMessage(); if (bareMessage.size) out.write(bareMessage.data, bareMessage.size); + //write footer: + qpid::amqp::CharSequence footer = message->getFooter(); + if (footer.size) out.write(footer.data, footer.size); } else { const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()); if (transfer) { diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index cac4434c48..db7a0f02d5 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -116,11 +116,6 @@ void MessageTransfer::computeRequiredCredit() requiredCredit = sum.getSize(); cachedRequiredCredit = true; } -uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg) -{ - //TODO: may need to reflect annotations and other modifications in this also - return get(msg).getRequiredCredit(); -} qpid::framing::FrameSet& MessageTransfer::getFrames() { diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 590e389518..9e432235e6 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -109,7 +109,6 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const; static bool isImmediateDeliveryRequired(const qpid::broker::Message& message); - static uint32_t getRequiredCredit(const qpid::broker::Message&); static MessageTransfer& get(qpid::broker::Message& message) { return *dynamic_cast<MessageTransfer*>(&message.getEncoding()); } diff --git a/qpid/cpp/src/qpid/client/TCPConnector.cpp b/qpid/cpp/src/qpid/client/TCPConnector.cpp index b92f342b74..783742764b 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.cpp +++ b/qpid/cpp/src/qpid/client/TCPConnector.cpp @@ -151,6 +151,11 @@ void TCPConnector::socketClosed(AsynchIO&, const Socket&) { shutdownHandler->shutdown(); } +void TCPConnector::connectAborted() { + connector->stop(); + connectFailed("Connection timedout"); +} + void TCPConnector::abort() { // Can't abort a closed connection if (!closed) { @@ -159,8 +164,7 @@ void TCPConnector::abort() { aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); } else if (connector) { // We're still connecting - connector->stop(); - connectFailed("Connection timedout"); + connector->requestCallback(boost::bind(&TCPConnector::connectAborted, this)); } } } diff --git a/qpid/cpp/src/qpid/client/TCPConnector.h b/qpid/cpp/src/qpid/client/TCPConnector.h index a90dffd3ef..63af3b878a 100644 --- a/qpid/cpp/src/qpid/client/TCPConnector.h +++ b/qpid/cpp/src/qpid/client/TCPConnector.h @@ -80,6 +80,7 @@ class TCPConnector : public Connector, public sys::Codec void close(); void send(framing::AMQFrame& frame); void abort(); + void connectAborted(); void setInputHandler(framing::InputHandler* handler); void setShutdownHandler(sys::ShutdownHandler* handler); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 530211ced4..4b3f1d49c1 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -71,7 +71,7 @@ class HaBroker : public management::Manageable void initialize(); // Implement Manageable. - qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } + qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; } management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp index 651215ffb5..322ec16656 100644 --- a/qpid/cpp/src/qpid/management/Manageable.cpp +++ b/qpid/cpp/src/qpid/management/Manageable.cpp @@ -41,6 +41,16 @@ string Manageable::StatusText (status_t status, string text) return "??"; } +ManagementObject* Manageable::GetManagementObject(void) const +{ + return 0; +} + +ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const +{ + return ManagementObject::shared_ptr(); +} + Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&) { return STATUS_UNKNOWN_METHOD; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 7b8808c0a0..3f647ba052 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing (void) // if (publish) { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); qpid::sys::MemStat::loadMemInfo(memstat.get()); } @@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply string label; uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; - ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId(); Uuid systemId; moveNewObjects(); @@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); - agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId()); agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); @@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); } @@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(const string& body, const string& rte, cons if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); } /* diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 7f1a2e3e66..9df5825e32 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -211,7 +211,7 @@ private: ObjectId connectionRef; qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} - ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; } virtual ~RemoteAgent (); void mapEncode(qpid::types::Variant::Map& _map) const; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 67c7f29448..b2a9b979b6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -53,7 +53,8 @@ ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Va writeHeader(false), readHeader(false), haveOutput(false), - state(DISCONNECTED) + state(DISCONNECTED), + codecSwitch(*this) { if (pn_transport_bind(engine, connection)) { //error @@ -149,7 +150,14 @@ void ConnectionContext::close() qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){ + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //wait for outstanding sends to settle + while (!i->second->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait();//wait until message has been confirmed + } + + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } @@ -181,6 +189,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach? + wakeupDriver(); } return true; } else { @@ -188,12 +197,24 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); - while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) { - QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver))); + while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); wait(); } + if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { + pn_link_flow(lnk->receiver, lnk->capacity); + } + } + if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (lnk->capacity) { + pn_link_flow(lnk->receiver, 1); + wakeupDriver(); + } + return true; + } else { + return false; } - return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE); } } @@ -322,6 +343,7 @@ void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); receiver->setCapacity(capacity); pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); + wakeupDriver(); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) { @@ -543,13 +565,48 @@ bool ConnectionContext::useSasl() qpid::sys::Codec& ConnectionContext::getCodec() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (sasl.get()) { - qpid::sys::Codec* c = sasl->getCodec(); - if (c) return *c; - lock.notifyAll(); + return codecSwitch; +} + +ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {} +std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t decoded = 0; + if (parent.sasl.get() && !parent.sasl->authenticated()) { + decoded = parent.sasl->decode(buffer, size); + if (!parent.sasl->authenticated()) return decoded; } - return *this; + if (decoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += parent.decode(buffer+decoded, size-decoded); + } + return decoded; } +std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + size_t encoded = 0; + if (parent.sasl.get() && parent.sasl->canEncode()) { + encoded += parent.sasl->encode(buffer, size); + if (!parent.sasl->authenticated()) return encoded; + } + if (encoded < size) { + if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += parent.encode(buffer+encoded, size-encoded); + } + return encoded; +} +bool ConnectionContext::CodecSwitch::canEncode() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + if (parent.sasl.get()) { + if (parent.sasl->canEncode()) return true; + else if (!parent.sasl->authenticated()) return false; + else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode(); + } + return parent.canEncode(); +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index d9da6551b3..3718184365 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -123,6 +123,17 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag CONNECTED } state; std::auto_ptr<Sasl> sasl; + class CodecSwitch : public qpid::sys::Codec + { + public: + CodecSwitch(ConnectionContext&); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + private: + ConnectionContext& parent; + }; + CodecSwitch codecSwitch; void wait(); void wakeupDriver(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 8034df311a..414793c7fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -120,18 +120,20 @@ void ReceiverContext::configure(pn_terminus_t* source) const helper.setNodeProperties(source); } - //filter: - pn_data_t* filter = pn_terminus_filter(source); - pn_data_put_map(filter); - pn_data_enter(filter); - pn_data_put_symbol(filter, convert("subject")); - //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved - //pn_data_put_described(filter); - //pn_data_enter(filter); - //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); - pn_data_put_string(filter, convert(address.getSubject())); - //pn_data_exit(filter); - pn_data_exit(filter); + if (!address.getSubject().empty()) { + //filter: + pn_data_t* filter = pn_terminus_filter(source); + pn_data_put_map(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert("subject")); + //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved + //pn_data_put_described(filter); + //pn_data_enter(filter); + //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject())); + pn_data_put_string(filter, convert(address.getSubject())); + //pn_data_exit(filter); + pn_data_exit(filter); + } } bool ReceiverContext::isClosed() const diff --git a/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp b/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp index af13697c20..a8bae1adda 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp @@ -58,7 +58,7 @@ std::size_t Sasl::encode(char* buffer, std::size_t size) encoded += writeProtocolHeader(buffer, size); writeHeader = !encoded; } - if (state == NONE && encoded < size) { + if (encoded < size) { encoded += write(buffer + encoded, size - encoded); } haveOutput = (encoded == size); @@ -135,14 +135,9 @@ void Sasl::outcome(uint8_t result) context.activateOutput(); } -qpid::sys::Codec* Sasl::getCodec() +qpid::sys::Codec* Sasl::getSecurityLayer() { - switch (state) { - case SUCCEEDED: return static_cast<qpid::sys::Codec*>(securityLayer.get()); - case FAILED: throw qpid::messaging::UnauthorizedAccess("Failed to authenticate"); - case NONE: return static_cast<qpid::sys::Codec*>(this); - } - return 0; + return securityLayer.get(); } bool Sasl::authenticated() diff --git a/qpid/cpp/src/qpid/messaging/amqp/Sasl.h b/qpid/cpp/src/qpid/messaging/amqp/Sasl.h index 3a2f2e9ffc..6657779fdc 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/Sasl.h +++ b/qpid/cpp/src/qpid/messaging/amqp/Sasl.h @@ -47,7 +47,7 @@ class Sasl : public qpid::sys::Codec, qpid::amqp::SaslClient bool canEncode(); bool authenticated(); - qpid::sys::Codec* getCodec(); + qpid::sys::Codec* getSecurityLayer(); std::string getAuthenticatedUsername(); private: ConnectionContext& context; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 95398fea6f..96c4437b89 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -80,7 +80,7 @@ const std::string& SenderContext::getTarget() const SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) { - if (processUnsettled() < capacity) { + if (processUnsettled() < capacity && pn_link_credit(sender)) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address); @@ -95,6 +95,7 @@ uint32_t SenderContext::processUnsettled() { //remove accepted messages from front of deque while (!deliveries.empty() && deliveries.front().accepted()) { + deliveries.front().settle(); deliveries.pop_front(); } return deliveries.size(); @@ -336,7 +337,10 @@ bool SenderContext::Delivery::accepted() { return pn_delivery_remote_state(token) == PN_ACCEPTED; } - +void SenderContext::Delivery::settle() +{ + pn_delivery_settle(token); +} void SenderContext::configure() const { configure(pn_link_target(sender)); @@ -350,4 +354,10 @@ void SenderContext::configure(pn_terminus_t* target) const helper.setNodeProperties(target); } } + +bool SenderContext::settled() +{ + return processUnsettled() == 0; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 0202d6aa4b..3595379e70 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -53,6 +53,7 @@ class SenderContext void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); void send(pn_link_t*); bool accepted(); + void settle(); private: int32_t id; pn_delivery_t* token; @@ -69,6 +70,7 @@ class SenderContext const std::string& getTarget() const; Delivery* send(const qpid::messaging::Message& message); void configure() const; + bool settled(); private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 8b3feb129a..9bdc658bc7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -144,4 +144,13 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c } } +bool SessionContext::settled() +{ + bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + if (!i->second->settled()) result = false; + } + return result; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index fbc8731230..eca30a0e97 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -59,6 +59,7 @@ class SessionContext boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); uint32_t getReceivable(); uint32_t getUnsettledAcks(); + bool settled(); private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h index 8eed72d40d..a531ee1dbb 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIO.h +++ b/qpid/cpp/src/qpid/sys/AsynchIO.h @@ -58,6 +58,7 @@ class AsynchConnector { public: typedef boost::function1<void, const Socket&> ConnectedCallback; typedef boost::function3<void, const Socket&, int, const std::string&> FailedCallback; + typedef boost::function1<void, AsynchConnector&> RequestCallback; // Call create() to allocate a new AsynchConnector object with the // specified poller, addressing, and callbacks. @@ -72,6 +73,7 @@ public: FailedCallback failCb); virtual void start(boost::shared_ptr<Poller> poller) = 0; virtual void stop() {}; + virtual void requestCallback(RequestCallback) = 0; protected: AsynchConnector() {} virtual ~AsynchConnector() {} diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index 7eb25fd861..2c17cc001c 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -143,6 +143,7 @@ class AsynchConnector : public qpid::sys::AsynchConnector, private: void connComplete(DispatchHandle& handle); + void requestedCall(RequestCallback rCb); private: ConnectedCallback connCallback; @@ -158,6 +159,7 @@ public: FailedCallback failCb); void start(Poller::shared_ptr poller); void stop(); + void requestCallback(RequestCallback rCb); }; AsynchConnector::AsynchConnector(const Socket& s, @@ -191,6 +193,18 @@ void AsynchConnector::stop() stopWatch(); } +void AsynchConnector::requestCallback(RequestCallback callback) { + // TODO creating a function object every time isn't all that + // efficient - if this becomes heavily used do something better (what?) + assert(callback); + DispatchHandle::call(boost::bind(&AsynchConnector::requestedCall, this, callback)); +} + +void AsynchConnector::requestedCall(RequestCallback callback) { + assert(callback); + callback(*this); +} + void AsynchConnector::connComplete(DispatchHandle& h) { int errCode = socket.getError(); diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 7dcc5c5846..d8aa6efda7 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -198,6 +198,7 @@ public: ConnectedCallback connCb, FailedCallback failCb = 0); void start(Poller::shared_ptr poller); + void requestCallback(RequestCallback rCb); }; AsynchConnector::AsynchConnector(const Socket& sock, @@ -223,6 +224,13 @@ void AsynchConnector::start(Poller::shared_ptr) } } +// This can never be called in the current windows code as connect +// is blocking and requestCallback only makes sense if connect is +// non-blocking with the results returned via a poller callback. +void AsynchConnector::requestCallback(RequestCallback rCb) +{ +} + } // namespace windows AsynchAcceptor* AsynchAcceptor::create(const Socket& s, diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp index 7bbcd4de1b..3e2a5fb36c 100644 --- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -52,13 +52,14 @@ private: PollableCondition& parent; boost::shared_ptr<sys::Poller> poller; LONG isSet; + LONG isDispatching; }; PollableConditionPrivate::PollableConditionPrivate(const sys::PollableCondition::Callback& cb, sys::PollableCondition& parent, const boost::shared_ptr<sys::Poller>& poller) : IOHandle(INVALID_SOCKET, boost::bind(&PollableConditionPrivate::dispatch, this, _1)), - cb(cb), parent(parent), poller(poller), isSet(0) + cb(cb), parent(parent), poller(poller), isSet(0), isDispatching(0) { } @@ -77,7 +78,12 @@ void PollableConditionPrivate::poke() void PollableConditionPrivate::dispatch(windows::AsynchIoResult *result) { delete result; // Poller::monitorHandle() allocates this + // If isDispatching is already set, just return. Else, enter. + if (::InterlockedCompareExchange(&isDispatching, 1, 0) == 1) + return; cb(parent); + LONG oops = ::InterlockedDecrement(&isDispatching); // Result must be 0 + assert(!oops); if (isSet) poke(); } diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 9c21e51a18..71e1945d94 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -123,7 +123,7 @@ class TestManageable : public qpid::management::Manageable mgmtObj = tmp; }; ~TestManageable() { mgmtObj.reset(); } - management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; }; + management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObj; }; static void validateTestObjectProperties(_qmf::TestObject& to) { // verify the default values are as expected. We don't check 'string1', @@ -209,11 +209,11 @@ QPID_AUTO_TEST_CASE(v1ObjPublish) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("obj1")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObject(), 1); + agent->addObject(tm->GetManagementObjectShared(), 1); // wait for the object to be published Message m1; @@ -234,7 +234,7 @@ QPID_AUTO_TEST_CASE(v1ObjPublish) // destroy the object - tm->GetManagementObject()->resourceDestroy(); + tm->GetManagementObjectShared()->resourceDestroy(); // wait for the deleted object to be published @@ -272,9 +272,9 @@ QPID_AUTO_TEST_CASE(v2ObjPublish) TestManageable *tm = new TestManageable(agent, std::string("obj2")); - Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); + Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObjectShared()->getPackageName(), "#"); - agent->addObject(tm->GetManagementObject(), "testobj-1"); + agent->addObject(tm->GetManagementObjectShared(), "testobj-1"); // wait for the object to be published Message m1; @@ -295,7 +295,7 @@ QPID_AUTO_TEST_CASE(v2ObjPublish) // destroy the object - tm->GetManagementObject()->resourceDestroy(); + tm->GetManagementObjectShared()->resourceDestroy(); // wait for the deleted object to be published @@ -335,11 +335,11 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("myObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObject(), 1); + agent->addObject(tm->GetManagementObjectShared(), 1); // wait for the object to be published Message m1; @@ -352,7 +352,7 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj) // destroy the object, then immediately export (before the next poll cycle) ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); + tm->GetManagementObjectShared()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -399,11 +399,11 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("anObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObject(), 1); + agent->addObject(tm->GetManagementObjectShared(), 1); // wait for the object to be published Message m1; @@ -416,7 +416,7 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj) // destroy the object, then immediately export (before the next poll cycle) ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); + tm->GetManagementObjectShared()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -478,8 +478,8 @@ QPID_AUTO_TEST_CASE(v1ExportFastDelObj) // add, then immediately delete and export the object... ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->addObject(tm->GetManagementObject(), 999); - tm->GetManagementObject()->resourceDestroy(); + agent->addObject(tm->GetManagementObjectShared(), 999); + tm->GetManagementObjectShared()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -511,8 +511,8 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) // FOR ALL OBJECTS, so objLen will be the same. Otherwise the // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObject()->writePropertiesSize(); - agent->addObject(tm->GetManagementObject(), i + 1); + objLen = tm->GetManagementObjectShared()->writePropertiesSize(); + agent->addObject(tm->GetManagementObjectShared(), i + 1); tmv.push_back(tm); } @@ -531,7 +531,7 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) uint32_t delCount = 0; for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); + tmv[i]->GetManagementObjectShared()->resourceDestroy(); delCount++; } @@ -604,8 +604,8 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) std::stringstream key; key << "testobj-" << i; TestManageable *tm = new TestManageable(agent, key.str()); - if (tm->GetManagementObject()->writePropertiesSize()) {} - agent->addObject(tm->GetManagementObject(), key.str()); + if (tm->GetManagementObjectShared()->writePropertiesSize()) {} + agent->addObject(tm->GetManagementObjectShared(), key.str()); tmv.push_back(tm); } @@ -624,7 +624,7 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) uint32_t delCount = 0; for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); + tmv[i]->GetManagementObjectShared()->resourceDestroy(); delCount++; } @@ -689,12 +689,12 @@ QPID_AUTO_TEST_CASE(v2RapidRestoreObj) TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObjectShared()->getPackageName(), "#"); // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObject(), "testobj-1"); - tm1->GetManagementObject()->resourceDestroy(); - agent->addObject(tm2->GetManagementObject(), "testobj-1"); + agent->addObject(tm1->GetManagementObjectShared(), "testobj-1"); + tm1->GetManagementObjectShared()->resourceDestroy(); + agent->addObject(tm2->GetManagementObjectShared(), "testobj-1"); // expect: a delete notification, then an update notification TestObjectVector objs; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index f3a639c234..55387f0091 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -154,7 +154,7 @@ receiver_SOURCES = \ receiver.cpp \ TestOptions.h \ ConnectionOptions.h -receiver_LDADD = $(lib_client) +receiver_LDADD = $(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS += sender sender_SOURCES = \ @@ -162,7 +162,7 @@ sender_SOURCES = \ TestOptions.h \ ConnectionOptions.h \ Statistics.cpp -sender_LDADD = $(lib_messaging) +sender_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes -lqpidclient qpidexectest_PROGRAMS += qpid-receive qpid_receive_SOURCES = \ @@ -171,7 +171,7 @@ qpid_receive_SOURCES = \ ConnectionOptions.h \ Statistics.h \ Statistics.cpp -qpid_receive_LDADD = $(lib_messaging) +qpid_receive_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes qpidexectest_PROGRAMS += qpid-send qpid_send_SOURCES = \ @@ -180,42 +180,42 @@ qpid_send_SOURCES = \ ConnectionOptions.h \ Statistics.h \ Statistics.cpp -qpid_send_LDADD = $(lib_messaging) +qpid_send_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes qpidexectest_PROGRAMS+=qpid-perftest qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_perftest_LDADD=$(lib_client) +qpid_perftest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-txtest qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h -qpid_txtest_LDADD=$(lib_client) +qpid_txtest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-latency-test qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h -qpid_latency_test_LDADD=$(lib_client) +qpid_latency_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-client-test qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h -qpid_client_test_LDADD=$(lib_client) +qpid_client_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-topic-listener qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h -qpid_topic_listener_LDADD=$(lib_client) +qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-topic-publisher qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h -qpid_topic_publisher_LDADD=$(lib_client) +qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options -lqpidcommon qpidexectest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h -qpid_ping_LDADD=$(lib_client) +qpid_ping_LDADD=$(lib_client) -lboost_program_options -lqpidcommon # # Other test programs diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp index d538a8181c..e6010a8e00 100644 --- a/qpid/cpp/src/tests/testagent.cpp +++ b/qpid/cpp/src/tests/testagent.cpp @@ -59,7 +59,7 @@ class CoreClass : public Manageable { string name; ManagementAgent* agent; - _qmf::Parent::shared_ptr mgmtObject; + _qmf::Parent* mgmtObject; std::vector<ChildClass*> children; Mutex vectorLock; @@ -68,7 +68,7 @@ public: CoreClass(ManagementAgent* agent, string _name); ~CoreClass() { mgmtObject->resourceDestroy(); } - ManagementObject::shared_ptr GetManagementObject(void) const + ManagementObject* GetManagementObject(void) const { return mgmtObject; } void doLoop(); @@ -78,14 +78,14 @@ public: class ChildClass : public Manageable { string name; - _qmf::Child::shared_ptr mgmtObject; + _qmf::Child* mgmtObject; public: ChildClass(ManagementAgent* agent, CoreClass* parent, string name); ~ChildClass() { mgmtObject->resourceDestroy(); } - ManagementObject::shared_ptr GetManagementObject(void) const + ManagementObject* GetManagementObject(void) const { return mgmtObject; } void doWork() @@ -97,9 +97,9 @@ public: CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { static uint64_t persistId = 0x111222333444555LL; - mgmtObject = _qmf::Parent::shared_ptr(new _qmf::Parent(agent, this, name)); + mgmtObject = new _qmf::Parent(agent, this, name); - agent->addObject(mgmtObject.get(), persistId++); + agent->addObject(mgmtObject, persistId++); mgmtObject->set_state("IDLE"); } @@ -146,9 +146,9 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) { - mgmtObject = _qmf::Child::shared_ptr(new _qmf::Child(agent, this, parent, name)); + mgmtObject = new _qmf::Child(agent, this, parent, name); - agent->addObject(mgmtObject.get()); + agent->addObject(mgmtObject); } diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk index 25cf43d71e..9f530621c6 100644 --- a/qpid/cpp/src/tests/testagent.mk +++ b/qpid/cpp/src/tests/testagent.mk @@ -46,6 +46,6 @@ testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC) qpidexectest_PROGRAMS+=testagent testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC) -testagent_LDADD=$(top_builddir)/src/libqmf.la +testagent_LDADD=$(top_builddir)/src/libqmf.la -lqpidcommon -lqpidtypes -lqpidclient EXTRA_DIST+=testagent.xml diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Getting-Started.xml b/qpid/doc/book/src/java-broker/Java-Broker-Getting-Started.xml index 8bdd4dc238..630c27ce89 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Getting-Started.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Getting-Started.xml @@ -36,22 +36,25 @@ <title>Starting/Stopping on Windows</title> <para>Firstly change to the installation directory used during the <link linkend="Java-Broker-Installation-InstallationWindows">installation</link> and ensure that the <link linkend="Java-Broker-Installation-InstallationWindows-SettingQPIDWORK">QPID_WORK environment variable is set</link>.</para> - <para>Now use the qpid-server.bat to start the server</para> + <para>Now use the <command>qpid-server.bat</command> to start the server</para> <programlisting><![CDATA[bin\qpid-server.bat]]></programlisting> <para>Output similar to the following will be seen:</para> <screen>[Broker] BRK-1006 : Using configuration : C:\qpid\qpid-broker-&qpidCurrentRelease;\etc\config.xml [Broker] BRK-1007 : Using logging configuration : C:\qpid\qpid-broker-&qpidCurrentRelease;\etc\log4j.xml -[Broker] MNG-1001 : Startup -[Broker] MNG-1002 : Starting : RMI Registry : Listening on port 8999 -[Broker] MNG-1002 : Starting : JMX RMIConnectorServer : Listening on port 9099 -[Broker] MNG-1004 : Ready -[Broker] BRK-1001 : Startup : Version: &qpidCurrentRelease; Build: 1363863 +[Broker] BRK-1001 : Startup : Version: &qpidCurrentRelease; Build: 1411386 [Broker] BRK-1010 : Platform : JVM : Sun Microsystems Inc. version: 1.6.0_24-b07 OS : Windows 7 version: 6.1 arch: amd64 [Broker] BRK-1011 : Maximum Memory : 1,069,416,448 bytes +[Broker] MNG-1001 : Web Management Startup +[Broker] MNG-1002 : Starting : HTTP : Listening on port 8080 +[Broker] MNG-1004 : Web Management Ready +[Broker] MNG-1001 : JMX Management Startup +[Broker] MNG-1002 : Starting : RMI Registry : Listening on port 8999 +[Broker] MNG-1002 : Starting : JMX RMIConnectorServer : Listening on port 9099 +[Broker] MNG-1004 : JMX Management Ready [Broker] BRK-1002 : Starting : Listening on TCP port 5672 [Broker] BRK-1004 : Qpid Broker Ready</screen> - <para>The BRK-1004 message confirms that the Broker is ready for work. The MNG-1002 and BRK-1004 confirm the ports to - which the Broker is listening (for JMX management and AMQP respectively).</para> + <para>The BRK-1004 message confirms that the Broker is ready for work. The MNG-1002 and BRK-1002 confirm the ports to + which the Broker is listening (for HTTP/JMX management and AMQP respectively).</para> <para>To stop the Broker, use Control-C or use the Shutdown MBean made from the <xref linkend="Java-Broker-Configuring-And-Managing-JMX"/></para> </section> @@ -64,17 +67,20 @@ <para>Output similar to the following will be seen:</para> <screen>[Broker] BRK-1006 : Using configuration : /usr/local/qpid/qpid-broker-&qpidCurrentRelease;/etc/config.xml [Broker] BRK-1007 : Using logging configuration : /usr/local/qpid/qpid-broker-&qpidCurrentRelease;/etc/log4j.xml -[Broker] MNG-1001 : Startup +[Broker] BRK-1001 : Startup : Version: &qpidCurrentRelease; Build: 1411386 +[Broker] BRK-1010 : Platform : JVM : Apple Inc. version: 1.6.0_35-b10-428-11M3811 OS : Mac OS X version: 10.8.2 arch: x86_64 +[Broker] BRK-1011 : Maximum Memory : 1,070,399,488 bytes +[Broker] MNG-1001 : Web Management Startup +[Broker] MNG-1002 : Starting : HTTP : Listening on port 8080 +[Broker] MNG-1004 : Web Management Ready +[Broker] MNG-1001 : JMX Management Startup [Broker] MNG-1002 : Starting : RMI Registry : Listening on port 8999 [Broker] MNG-1002 : Starting : JMX RMIConnectorServer : Listening on port 9099 -[Broker] MNG-1004 : Ready -[Broker] BRK-1001 : Startup : Version: &qpidCurrentRelease; Build: 1363863 -[Broker] BRK-1010 : Platform : JVM : Apple Inc. version: 1.6.0_35-b10-428-11M3811 OS : Mac OS X version: 10.8.2 arch: x86_64 -[Broker] BRK-1011 : Maximum Memory : 1,069,416,448 bytes +[Broker] MNG-1004 : JMX Management Ready [Broker] BRK-1002 : Starting : Listening on TCP port 5672 [Broker] BRK-1004 : Qpid Broker Ready</screen> - <para>The BRK-1004 message confirms that the Broker is ready for work. The MNG-1002 and BRK-1004 confirm the ports to - which the Broker is listening (for JMX management and AMQP respectively).</para> + <para>The BRK-1004 message confirms that the Broker is ready for work. The MNG-1002 and BRK-1002 confirm the ports to + which the Broker is listening (for HTTP/JMX management and AMQP respectively).</para> <para>To stop the Broker, use Control-C from the controlling shell, use the <command>bin/qpid.stop</command> script, or use <command>kill -TERM <pid></command> or the Shutdown MBean from <xref linkend="Java-Broker-Configuring-And-Managing-JMX"/></para> diff --git a/qpid/doc/book/src/java-broker/Java-Broker-High-Availability.xml b/qpid/doc/book/src/java-broker/Java-Broker-High-Availability.xml index 618533dc5f..7ea9dae38a 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-High-Availability.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-High-Availability.xml @@ -506,7 +506,7 @@ amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672?connectdelay=' <para>Qpid exposes the BDB HA store information via its JMX interface and provides APIs to remove a Node from the group, update a Node IP address, and assign a Node as the designated primary.</para> <para>An instance of the <classname>BDBHAMessageStore</classname> MBean is instantiated by the broker for the each virtualhost using the HA store.</para> - <para>The reference to this MBean can be obtained via JMX API using an ObjectName like <emphasis>org.apache.qpid:type=BDBHAMessageStore,name=<virtualhost name></emphasis> + <para>The reference to this MBean can be obtained via JMX API using an ObjectName like <emphasis>org.apache.qpid:type=BDBHAMessageStore,name="<virtualhost name>"</emphasis> where <virtualhost name> is the name of a specific virtualhost on the broker.</para> <table border="1"> <title>Mbean <classname>BDBHAMessageStore</classname> attributes</title> @@ -630,7 +630,7 @@ JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost: JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment); MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); -ObjectName queueObjectName = new ObjectName("org.apache.qpid:type=BDBHAMessageStore,name=test"); +ObjectName queueObjectName = new ObjectName("org.apache.qpid:type=BDBHAMessageStore,name=\"test\""); String state = (String)mbsc.getAttribute(queueObjectName, "NodeState"); System.out.println("Node state:" + state); diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Runtime-Producer-Transaction-Timeout.xml b/qpid/doc/book/src/java-broker/Java-Broker-Runtime-Producer-Transaction-Timeout.xml new file mode 100644 index 0000000000..04212d94ed --- /dev/null +++ b/qpid/doc/book/src/java-broker/Java-Broker-Runtime-Producer-Transaction-Timeout.xml @@ -0,0 +1,181 @@ +<?xml version="1.0" encoding="utf-8"?> +<!DOCTYPE entities [ +<!ENTITY % entities SYSTEM "commonEntities.xml"> +%entities; +]> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<section id="Java-Broker-Runtime-Producer-Transaction-Timeout"> + <title>Producer Transaction Timeout</title> + <section role="h2" id="Java-Broker-Runtime-Producer-Transaction-Timeout-GeneralInformation"> + <title>General Information</title> + <para> The transaction timeout mechanism is used to control broker resources when clients + producing messages using transactional sessions hang or otherwise become unresponsive, or simply + begin a transaction and keep using it without ever calling <ulink + url="&oracleJeeDocUrl;javax/jms/Session.html#commit">Session#commit()</ulink>.</para> + <para>Users can choose to configure an idleWarn or openWarn threshold, after which the identified + transaction should be logged as a WARN level alert as well as (more importantly) an idleClose or + openClose threshold after which the transaction and the connection it applies to will be + closed.</para> + <para>This feature is particularly useful in environments where the owner of the broker does not + have full control over the implementation of clients, such as in a shared services + deployment.</para> + <para>The following section provide more details on this feature and its use.</para> + </section> + <section role="h2" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Purpose"> + <title>Purpose</title> + <para> This feature has been introduced to address the scenario where an open transaction on the + broker holds an open transaction on the persistent store. This can have undesirable consequences + if the store does not time out or close long-running transactions, such as with <link + linkend="Java-Broker-Stores-BDB-Store">BDB</link>. This can can result in a rapid increase in + disk usage size, bounded only by available space, due to growth of the transaction log. </para> + </section> + <section role="h2" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Scope"> + <title>Scope</title> + <para>Note that only <ulink url="&oracleJeeDocUrl;javax/jms/MessageProducer.html" + >MessageProducer</ulink> clients will be affected by a transaction timeout, since store + transaction lifespan on a consumer only spans the execution of the call to Session#commit() and + there is no scope for a long-lived transaction to arise.</para> + <para>It is also important to note that the transaction timeout mechanism is purely a JMS + transaction timeout, and unrelated to any other timeouts in the Qpid client library and will have + no impact on any RDBMS your application may utilise.</para> + </section> + <section role="h2" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Effect"> + <title>Effect</title> + <para>Full details of configuration options are provided in the sections that follow. This section + gives a brief overview of what the Transaction Timeout feature can do.</para> + <section role="h3" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Effect-Broker-Side"> + <title>Broker Logging and Connection Close</title> + <para>When the openWarn or idleWarn specified threshold is exceeded, the broker will log a WARN + level alert with details of the connection and channel on which the threshold has been exceeded, + along with the age of the transaction.</para> + <para>When the openClose or idleClose specified threshold value is exceeded, the broker will + throw an exception back to the client connection via the <ulink + url="&oracleJeeDocUrl;javax/jms/ExceptionListener.html">ExceptionListener</ulink>, log the + action and then close the connection.</para> + <para>The example broker log output shown below is where the idleWarn threshold specified is + lower than the idleClose threshold and the broker therefore logs the idle transaction 3 times + before the close threshold is triggered and the connection closed out.</para> + <screen><![CDATA[CHN-1008 : Idle Transaction : 13,116 ms +CHN-1008 : Idle Transaction : 14,116 ms +CHN-1008 : Idle Transaction : 15,118 ms +CHN-1003 : Close]]> + </screen> + <para>The second example broker log output shown below illustrates the same mechanism operating + on an open transaction.</para> + <screen><![CDATA[ +CHN-1007 : Open Transaction : 12,406 ms +CHN-1007 : Open Transaction : 13,406 ms +CHN-1007 : Open Transaction : 14,406 ms +CHN-1003 : Close]]> + </screen> + </section> + <section role="h3" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Effect-Client-Side"> + <title>Client Side Effect</title> + <para>After a Close threshold has been exceeded, the trigger client will receive this exception + on its <ulink url="&oracleJeeDocUrl;javax/jms/ExceptionListener.html">exception + listener</ulink>, prior to being disconnected:</para> + <computeroutput>org.apache.qpid.AMQConnectionClosedException: Error: Idle transaction timed out + [error code 506: resource error]</computeroutput> + <para>Any later attempt to use the connection will result in this exception being thrown:</para> + <screen><![CDATA[Producer: Caught an Exception: javax.jms.IllegalStateException: Object org.apache.qpid.client.AMQSession_0_8@129b0e1 has been closed + javax.jms.IllegalStateException: Object org.apache.qpid.client.AMQSession_0_8@129b0e1 has been closed + at org.apache.qpid.client.Closeable.checkNotClosed(Closeable.java:70) + at org.apache.qpid.client.AMQSession.checkNotClosed(AMQSession.java:555) + at org.apache.qpid.client.AMQSession.createBytesMessage(AMQSession.java:573)]]> + </screen> + <para>Thus clients must be able to handle this case successfully, reconnecting where required and + registering an exception listener on all connections. This is critical, and must be communicated + to client applications by any broker owner switching on transaction timeouts.</para> + </section> + + </section> + <section role="h2" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Configuration"> + <title>Configuration</title> + <section role="h3" id="Java-Broker-Runtime-Producer-Transaction-Timeout-Configuration-Overview"> + <title>Configuration</title> + <para>Transaction timeouts are configurable separately on each defined virtual host, using the + virtualhosts.xml file.</para> + <para>We would recommend that only warnings are configured at first, which should allow broker + administrators to obtain an idea of the distribution of transaction lengths on their systems, + and configure production settings appropriately for both warning and closure. Ideally + establishing thresholds should be achieved in a representative UAT environment, with clients and + broker running, prior to any production deployment.</para> + <para>It is impossible to give suggested values, due to the large variation in usage depending on + the applications using a broker. However, clearly transactions should not span the expected + lifetime of any client application as this would indicate a hung client.</para> + <para>When configuring warning and closure timeouts, it should be noted that these only apply to + message producers that are connected to the broker, but that a timeout will cause the connection + to be closed - this disconnecting all producers and consumers created on that connection.</para> + <para>This should not be an issue for environments using Mule or Spring, where connection + factories can be configured appropriately to manage a single MessageProducer object per JMS + Session and Connection. Clients that use the JMS API directly should be aware that sessions + managing both consumers and producers, or multiple producers, will be affected by a single + producer hanging or leaving a transaction idle or open, and closed, and must take appropriate + action to handle that scenario.</para> + </section> + <section role="h3" + id="Java-Broker-Runtime-Producer-Transaction-Timeout-Configuration-Virtualhosts"> + <title>Virtualhosts.xml</title> + <para> The JMS transaction timeouts are configured on each virtual host defined in the XML + configuration files.</para> + <para> The default values for each of the parameters is 0, indicating that the particular check + is disabled.</para> + <para> Any or all of the parameters can be set, using the desired value in milliseconds, and will + be checked each time the housekeeping process runs, usually set to run every 30 seconds in + standard configuration. The meaning of each property is as follows:</para> + <para> + <itemizedlist> + <listitem> + <para>openWarn - the time a transaction can be open for (with activity occurring on it) after + which a warning alert will be issued.</para> + </listitem> + <listitem> + <para>openClose - the time a transaction can be open for before the connection it is on is + closed.</para> + </listitem> + <listitem> + <para>idleWarn - the time a transaction can be idle for (with no activity occurring on it) + after which a warning alert will be issued.</para> + </listitem> + <listitem> + <para>idleClose - the time a transaction can be idle for before the connection it is on is + closed.</para> + </listitem> + </itemizedlist> + </para> + <para> The virtualhosts configuration is shown below, and must occur inside the + //virtualhosts/virtualhost/name/ elements: </para> + <example> +<title>Configuring producer transaction timeout</title> + <programlisting><![CDATA[ +<transactionTimeout> + <openWarn>10000</openWarn> + <openClose>20000</openClose> + <idleWarn>5000</idleWarn> + <idleClose>15000</idleClose> +</transactionTimeout> + ]]></programlisting> + </example> + </section> + </section> +</section> diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Runtime.xml b/qpid/doc/book/src/java-broker/Java-Broker-Runtime.xml index 236ef82ecd..6b21fd15c2 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Runtime.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Runtime.xml @@ -25,4 +25,5 @@ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Java-Broker-Runtime-Log-Files.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Java-Broker-Runtime-Alerts.xml"/> <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Java-Broker-Runtime-Disk-Space-Management.xml"/> + <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Java-Broker-Runtime-Producer-Transaction-Timeout.xml"/> </chapter> diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Security-Authentication-Providers.xml b/qpid/doc/book/src/java-broker/Java-Broker-Security-Authentication-Providers.xml index 96b6f99185..0974441ae5 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Security-Authentication-Providers.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Security-Authentication-Providers.xml @@ -38,18 +38,44 @@ </section> - <section> + <section id="LDAPAuthManager"> <title>LDAP</title> + + <para> + LDAP authentication can be configured using the <simple-ldap-auth-manager> element + within the <security> section. An example of how to configure this is shown below. + Please note this example also configures an unused <pd-auth-manager> to use an empty + password file, this is a workaround for an issue relating to registration of security providers. + </para> + + <para> + <emphasis>NOTE: When using LDAP authentication, you must also use SSL on the brokers AMQP messaging and + JMX/HTTP management ports in order to protect passwords during transmission to the broker.</emphasis> + </para> <example> <title>Configuring LDAP authentication</title> <programlisting><![CDATA[ <security> - <simple-ldap-auth-manager> - <provider-url>ldaps://example.com:636/</provider-url> - <search-context>dc=example\,dc=com</search-context> - <search-filter>(uid={0})</search-filter> - </simple-ldap-auth-manager> - ... + <default-auth-manager>SimpleLDAPAuthenticationManager</default-auth-manager> + <simple-ldap-auth-manager> + <provider-url>ldaps://example.com:636/</provider-url> + <search-context>dc=example\,dc=com</search-context> + <search-filter>(uid={0})</search-filter> + </simple-ldap-auth-manager> + + <!-- Unused pd-auth-manager, a workaround to register the necessary security providers --> + <pd-auth-manager> + <principal-database> + <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class> + <attributes> + <attribute> + <name>passwordFile</name> + <value>${conf}/emptyPasswdFile</value> + </attribute> + </attributes> + </principal-database> + <pd-auth-manager> + ... </security>]]></programlisting> </example> diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Stores-BDB-Store.xml b/qpid/doc/book/src/java-broker/Java-Broker-Stores-BDB-Store.xml index 9ce90f6529..c16d9aa227 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Stores-BDB-Store.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Stores-BDB-Store.xml @@ -52,16 +52,14 @@ <title>Oracle BDB JE jar installation</title> <para> If you wish to use the BDBMessageStore, copy the je-&oracleBdbProductVersion;.jar from within the release - downloaded <link linkend="Java-Broker-Stores-BDB-Store-BDBJE-Download">above</link> into an 'opt' sub-directory + downloaded <link linkend="Java-Broker-Stores-BDB-Store-BDBJE-Download">above</link> into the 'opt' sub-directory of the brokers 'lib' directory. </para> <programlisting>Unix: -mkdir qpid-broker-&qpidCurrentRelease;/lib/opt cp je-&oracleBdbProductVersion;.jar qpid-broker-&qpidCurrentRelease;/lib/opt</programlisting> <programlisting>Windows: -mkdir qpid-broker-&qpidCurrentRelease;\lib\opt copy je-&oracleBdbProductVersion;.jar qpid-broker-&qpidCurrentRelease;\lib\opt</programlisting> </section> diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Stores-Memory-Store.xml b/qpid/doc/book/src/java-broker/Java-Broker-Stores-Memory-Store.xml index 7ca50815cd..b8694f3315 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Stores-Memory-Store.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Stores-Memory-Store.xml @@ -32,7 +32,7 @@ ability to store new messages will be entirely constrained by the JVM heap size. </para> - <section role="h3" id="Java-Broker-Stores-Derby-Store-Configuration"> + <section role="h3" id="Java-Broker-Stores-Memory-Store-Configuration"> <title>Configuration</title> <para> In order to use the MemoryMessageStore, you must configure it for each VirtualHost desired by updating the store element diff --git a/qpid/doc/book/src/java-broker/commonEntities.xml b/qpid/doc/book/src/java-broker/commonEntities.xml index af95997f77..4dcbc96b69 100644 --- a/qpid/doc/book/src/java-broker/commonEntities.xml +++ b/qpid/doc/book/src/java-broker/commonEntities.xml @@ -23,7 +23,7 @@ <!ENTITY qpidProgrammingBook "../../Programming-In-Apache-Qpid/html/"> <!ENTITY qpidCppBook "../../AMQP-Messaging-Broker-CPP-Book/html/"> -<!ENTITY qpidCurrentRelease "0.18"> +<!ENTITY qpidCurrentRelease "0.21"> <!-- Oracle javadoc --> <!ENTITY oracleJdkDocUrl "http://oracle.com/javase/6/docs/api/"> diff --git a/qpid/doc/book/src/java-broker/images/HA-BDBHAMessageStore-MBean-jconsole.png b/qpid/doc/book/src/java-broker/images/HA-BDBHAMessageStore-MBean-jconsole.png Binary files differindex 6caaacb1e1..29d5494746 100644 --- a/qpid/doc/book/src/java-broker/images/HA-BDBHAMessageStore-MBean-jconsole.png +++ b/qpid/doc/book/src/java-broker/images/HA-BDBHAMessageStore-MBean-jconsole.png diff --git a/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml b/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml index fd32f42f2e..e2f6d8756c 100644 --- a/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml +++ b/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml @@ -3087,6 +3087,22 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather </para> </entry> </row> + <row> + <entry> + ssl + </entry> + <entry> + boolean + </entry> + <entry> + <para> + If <literal>ssl='true'</literal>, use SSL for all broker connections. Overrides any per-broker settings in the brokerlist (see below) entries. If not specified, the brokerlist entry for each given broker is used to determine whether SSL is used. + </para> + <para> + Introduced in version 0.22. + </para> + </entry> + </row> </tbody> </tgroup> </table> @@ -3237,6 +3253,7 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather trust_store_password </entry> <entry> + -- </entry> <entry> Trust store password @@ -3247,6 +3264,7 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather key_store </entry> <entry> + -- </entry> <entry> path to key store @@ -3271,7 +3289,9 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather Boolean </entry> <entry> - If <literal>ssl='true'</literal>, the JMS client will encrypt the connection using SSL. + <para>If <literal>ssl='true'</literal>, the JMS client will encrypt the connection to this broker using SSL.</para> + + <para>This can also be set/overridden for all brokers using the <link linkend="section-jms-connection-url">Connection URL</link> options.</para> </entry> </row> <row> @@ -3292,7 +3312,7 @@ spout - -content "$(cat rdu.xml | sed -e 's/70/45/')" xml/weather ssl_cert_alias </entry> <entry> - + -- </entry> <entry> If multiple certificates are present in the keystore, the alias will be used to extract the correct certificate. diff --git a/qpid/extras/qmf/setup.py b/qpid/extras/qmf/setup.py index fcfd2f8a30..db62ddba99 100755 --- a/qpid/extras/qmf/setup.py +++ b/qpid/extras/qmf/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-qmf", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["qmf"], diff --git a/qpid/packaging/windows/INSTALL_NOTES.html b/qpid/packaging/windows/INSTALL_NOTES.html index 66e8076767..54f427ed79 100644 --- a/qpid/packaging/windows/INSTALL_NOTES.html +++ b/qpid/packaging/windows/INSTALL_NOTES.html @@ -1,11 +1,11 @@ <html>
<head>
-<title>Apache Qpid C++ 0.19 Installation Notes</title>
+<title>Apache Qpid C++ 0.21 Installation Notes</title>
</head>
<body>
-<H1>Apache Qpid C++ 0.19 Installation Notes</H1>
+<H1>Apache Qpid C++ 0.21 Installation Notes</H1>
-<p>Thank you for installing Apache Qpid version 0.19 for Windows.
+<p>Thank you for installing Apache Qpid version 0.21 for Windows.
If the requisite features were installed, you can now run a broker,
use the example programs, and design your own messaging programs while
reading the Qpid C++ API reference documentation.</p>
@@ -83,7 +83,7 @@ default; therefore, to gain support for durable items the persistence plugin must be loaded into the broker. This can be done using the
<code>--load-module</code> option to load the needed plugins. For example:
<pre>
-cd "C:\Program Files\Apache\qpidc-0.19"
+cd "C:\Program Files\Apache\qpidc-0.21"
qpidd.exe --load-module plugins\broker\store.dll --load-module plugins\broker\msclfs_store.dll
</pre>
The <code>--load-module</code> option can also take a full path. The option
diff --git a/qpid/packaging/windows/installer.proj b/qpid/packaging/windows/installer.proj index 6f5f119f4e..b2d1d6fb2f 100644 --- a/qpid/packaging/windows/installer.proj +++ b/qpid/packaging/windows/installer.proj @@ -32,7 +32,7 @@ <source_root>$(MSBuildProjectDirectory)\..\..</source_root>
<staging_dir>$(MSBuildProjectDirectory)\stage</staging_dir>
<bits Condition="'$(bits)' == ''">32</bits>
- <qpid_version>0.19</qpid_version>
+ <qpid_version>0.21</qpid_version>
<OutputName>qpidc</OutputName>
<OutputType>Package</OutputType>
<WixToolPath>C:\Program Files (x86)\Windows Installer XML v3.5\bin</WixToolPath>
diff --git a/qpid/python/setup.py b/qpid/python/setup.py index 0b9d99a1af..56af530b43 100755 --- a/qpid/python/setup.py +++ b/qpid/python/setup.py @@ -298,7 +298,7 @@ class install_lib(_install_lib): return outfiles + extra setup(name="qpid-python", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["mllib", "qpid", "qpid.messaging", "qpid.tests", diff --git a/qpid/tests/setup.py b/qpid/tests/setup.py index 8d5345d56e..67d2c87ad9 100755 --- a/qpid/tests/setup.py +++ b/qpid/tests/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tests", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["qpid_tests", "qpid_tests.broker_0_10", "qpid_tests.broker_0_9", diff --git a/qpid/tools/setup.py b/qpid/tools/setup.py index c9dc21c620..438a2af14f 100755 --- a/qpid/tools/setup.py +++ b/qpid/tools/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-tools", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", package_dir={'' : 'src/py'}, |