summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/configure.ac15
-rw-r--r--qpid/cpp/qpidc.spec.in1
-rwxr-xr-xqpid/cpp/rubygen/0-10/specification.rb177
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb (renamed from qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb (renamed from qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodHolder.rb (renamed from qpid/cpp/rubygen/templates/MethodHolder.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/Operations.rb (renamed from qpid/cpp/rubygen/templates/Operations.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/OperationsInvoker.rb (renamed from qpid/cpp/rubygen/templates/OperationsInvoker.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/Proxy.rb (renamed from qpid/cpp/rubygen/templates/Proxy.rb)8
-rw-r--r--qpid/cpp/rubygen/99-0/Session.rb (renamed from qpid/cpp/rubygen/templates/Session.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/all_method_bodies.rb (renamed from qpid/cpp/rubygen/templates/all_method_bodies.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/constants.rb (renamed from qpid/cpp/rubygen/templates/constants.rb)2
-rw-r--r--qpid/cpp/rubygen/99-0/frame_body_lists.rb (renamed from qpid/cpp/rubygen/templates/frame_body_lists.rb)2
-rw-r--r--qpid/cpp/rubygen/99-0/structs.rb (renamed from qpid/cpp/rubygen/templates/structs.rb)2
-rwxr-xr-xqpid/cpp/rubygen/MethodBodyDefaultVisitor.rb2
-rwxr-xr-xqpid/cpp/rubygen/amqpgen.rb139
-rwxr-xr-xqpid/cpp/rubygen/cppgen.rb104
-rwxr-xr-xqpid/cpp/rubygen/generate41
-rw-r--r--qpid/cpp/src/Makefile.am14
-rw-r--r--qpid/cpp/src/qpid/Exception.cpp27
-rw-r--r--qpid/cpp/src/qpid/Exception.h25
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/built_in_types.h23
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/helpers.cpp30
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/helpers.h67
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/visitors.h15
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--qpid/cpp/src/qpid/broker/HandlerImpl.h6
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp20
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionHandler.h10
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp112
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionManager.h100
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionState.cpp169
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionState.h124
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.h8
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp31
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp63
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h15
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp128
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h48
-rw-r--r--qpid/cpp/src/qpid/client/Session.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h1
-rw-r--r--qpid/cpp/src/qpid/framing/Proxy.h1
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp2
-rw-r--r--qpid/cpp/src/tests/serialize.cpp4
-rw-r--r--qpid/cpp/xml/cluster.xml2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java64
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java74
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java45
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java33
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java36
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java62
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java78
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java61
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java195
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java445
-rw-r--r--qpid/java/module.xml3
-rw-r--r--qpid/python/mllib/dom.py8
-rw-r--r--qpid/python/qpid/queue.py47
-rw-r--r--qpid/python/tests/queue.py41
76 files changed, 2345 insertions, 548 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index 3d68ed7890..c17051d431 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -122,11 +122,11 @@ test -n "$RUBY" && generate=yes
test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").])
specdir=`pwd`/$srcdir/../specs
-AMQP_XML=$specdir/amqp.0-10-preview.xml
-AC_SUBST(AMQP_XML)
-ls $AMQP_XML >/dev/null 2>&1 || generate=no
-
-AM_CONDITIONAL([GENERATE], [test x$generate = xyes])
+AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml
+AMQP_FINAL_XML=$specdir/amqp.0-10.xml
+AC_SUBST(AMQP_PREVIEW_XML)
+AC_SUBST(AMQP_FINAL_XML)
+AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null])
# URL and download URL for the package.
URL=http://rhm.et.redhat.com/qpidc
@@ -139,7 +139,6 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],,
AC_MSG_ERROR([Missing required header files.]))
# Check for optional CPG requirement.
-save_ldflags=$LDFLAGS
LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais"
AC_ARG_WITH([cpg],
@@ -157,15 +156,13 @@ AC_ARG_WITH([cpg],
esac],
[ # not specified - enable if libs/headers available.
with_CPG=yes
- AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no])
AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no])
+ AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no])
]
)
AM_CONDITIONAL([CPG], [test x$with_CPG = xyes])
if test x$with_CPG = xyes; then
CPPFLAGS+=" -DCPG"
-else
- LDFLAGS=$save_ldflags
fi
# Files to generate
diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in
index 167247f67b..fd4a24b2e5 100644
--- a/qpid/cpp/qpidc.spec.in
+++ b/qpid/cpp/qpidc.spec.in
@@ -104,6 +104,7 @@ make check
%files devel
%defattr(-,root,root,-)
%_includedir/qpid/*.h
+%_includedir/qpid/amqp_0_10
%_includedir/qpid/client
%_includedir/qpid/framing
%_includedir/qpid/sys
diff --git a/qpid/cpp/rubygen/0-10/specification.rb b/qpid/cpp/rubygen/0-10/specification.rb
new file mode 100755
index 0000000000..026b49e1a9
--- /dev/null
+++ b/qpid/cpp/rubygen/0-10/specification.rb
@@ -0,0 +1,177 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class Specification < CppGen
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @ns="qpid::amqp_#{@amqp.version.bars}"
+ @dir="qpid/amqp_#{@amqp.version.bars}"
+ end
+
+ # domains
+
+ def domain_h(d)
+ genl
+ typename=d.name.typename
+ if d.enum
+ scope("enum #{typename} {", "};") {
+ genl d.enum.choices.map { |c|
+ "#{c.name.constname} = #{c.value}" }.join(",\n")
+ }
+ elsif (d.type_ == "array")
+ genl "typedef Array<#{ArrayTypes[d.name].amqp2cpp}> #{typename};"
+ else
+ genl "typedef #{d.type_.amqp2cpp} #{typename};"
+ end
+ end
+
+ # class constants
+
+ def class_h(c)
+ genl "const uint8_t CODE=#{c.code};"
+ genl "extern const char* NAME;"
+ end
+
+ def class_cpp(c)
+ genl "const char* NAME=\"#{c.fqname}\";"
+ end
+
+ # Used by structs, commands and controls.
+ def action_struct_h(x, base, consts, &block)
+ genl
+ struct(x.classname, "public #{base}") {
+ x.fields.each { |f| genl "#{f.type_.amqp2cpp} #{f.cppname};" }
+ genl
+ genl "static const char* NAME;"
+ consts.each { |c| genl "static const uint8_t #{c.upcase}=#{x.send c or 0};"}
+ genl "static const uint8_t CLASS_CODE=#{x.containing_class.nsname}::CODE;"
+ genl
+ genl "#{x.classname}();"
+ scope("#{x.classname}(",");") { genl x.parameters } unless x.fields.empty?
+ genl
+ genl "void accept(Visitor&) const;"
+ genl
+ yield if block
+ }
+ end
+
+ def action_struct_cpp(x)
+ genl
+ genl "const char* #{x.classname}::NAME=\"#{x.fqname}\";"
+ genl
+ genl "#{x.classname}::#{x.classname}() {}";
+ genl
+ if not x.fields.empty?
+ scope("#{x.classname}::#{x.classname}(",") :") { genl x.parameters }
+ indent() { genl x.initializers }
+ genl "{}"
+ genl
+ end
+ scope("void #{x.classname}::accept(Visitor&) const {","}") {
+ genl "// FIXME aconway 2008-02-27: todo"
+ }
+ end
+
+ # structs
+
+ def struct_h(s) action_struct_h(s, "Struct", ["size","pack","code"]); end
+ def struct_cpp(s) action_struct_cpp(s) end
+
+ # command and control
+
+ def action_h(a)
+ action_struct_h(a, a.base, ["code"]) {
+ scope("template <class T> void invoke(T& target) {","}") {
+ scope("target.#{a.funcname}(", ");") { genl a.values }
+ }
+ genl
+ scope("template <class S> void serialize(S& s) {","}") {
+ gen "s"
+ a.fields.each { |f| gen "(#{f.cppname})"}
+ genl ";"
+ } unless a.fields.empty?
+ }
+ end
+
+ def action_cpp(a) action_struct_cpp(a); end
+
+ # Types that must be generated early because they are used by other types.
+ def pregenerate?(x) not @amqp.used_by[x.fqname].empty?; end
+
+ # Generate the log
+ def gen_specification()
+ h_file("#{@dir}/specification") {
+ include "#{@dir}/built_in_types"
+ include "#{@dir}/helpers"
+ include "<boost/call_traits.hpp>"
+ genl "using boost::call_traits;"
+ namespace(@ns) {
+ # Top level
+ @amqp.domains.each { |d|
+ # segment-type and track are are built in
+ domain_h d unless ["track","segment-type"].include?(d.name)
+ }
+ puts @amqp.used_by.inspect
+
+ # Domains and structs that must be generated early because
+ # they are used by other definitions:
+ each_class_ns { |c|
+ class_h c
+ c.domains.each { |d| domain_h d if pregenerate? d }
+ c.structs.each { |s| struct_h s if pregenerate? s }
+ }
+ # Now dependent domains/structs and actions
+ each_class_ns { |c|
+ c.domains.each { |d| domain_h d if not pregenerate? d }
+ c.structs.each { |s| struct_h s if not pregenerate? s }
+ c.actions.each { |a| action_h a }
+ }
+ }
+ }
+
+ cpp_file("#{@dir}/specification") {
+ include "#{@dir}/specification"
+ namespace(@ns) {
+ each_class_ns { |c|
+ class_cpp c
+ c.actions.each { |a| action_cpp a}
+ c.structs.each { |s| struct_cpp s }
+ }
+ }
+ }
+ end
+
+ def gen_proxy()
+ h_file("#{@dir}/Proxy.h") {
+ include "#{@dir}/specification"
+ namespace(@ns) {
+ genl "template <class F, class R=F::result_type>"
+ cpp_class("ProxyTemplate") {
+ public
+ genl "ProxyTemplate(F f) : functor(f) {}"
+ @amqp.classes.each { |c|
+ c.actions.each { |a|
+ scope("R #{a.funcname}(", ")") { genl a.parameters }
+ scope() {
+ var=a.name.funcname
+ scope("#{a.classname} #{var}(",");") { genl a.arguments }
+ genl "return functor(#{var});"
+ }
+ }
+ }
+ private
+ genl "F functor;"
+ }
+ }
+ }
+ end
+
+ def generate
+ gen_specification
+ gen_proxy
+ end
+end
+
+Specification.new($outdir, $amqp).generate();
+
diff --git a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb
index 18f74a2e41..f9ef95f5a0 100755
--- a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb
+++ b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb
@@ -23,5 +23,5 @@ class MethodBodyConstVisitorGen < CppGen
end
end
-MethodBodyConstVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyConstVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb
index 4944b10a84..a74b0c06d6 100755
--- a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb
+++ b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb
@@ -31,5 +31,5 @@ class MethodBodyDefaultVisitorGen < CppGen
end
end
-MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/MethodHolder.rb b/qpid/cpp/rubygen/99-0/MethodHolder.rb
index 7e915677b2..a708db6676 100755
--- a/qpid/cpp/rubygen/templates/MethodHolder.rb
+++ b/qpid/cpp/rubygen/99-0/MethodHolder.rb
@@ -96,5 +96,5 @@ EOS
end
end
-MethodHolderGen.new(Outdir, Amqp).generate();
+MethodHolderGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/99-0/Operations.rb
index 91007ef3e1..c985bb6105 100755
--- a/qpid/cpp/rubygen/templates/Operations.rb
+++ b/qpid/cpp/rubygen/99-0/Operations.rb
@@ -91,6 +91,6 @@ EOS
end
end
-OperationsGen.new("client",ARGV[0], Amqp).generate()
-OperationsGen.new("server",ARGV[0], Amqp).generate()
+OperationsGen.new("client",ARGV[0], $amqp).generate()
+OperationsGen.new("server",ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/OperationsInvoker.rb b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb
index 747dd06189..642f98ce8e 100755
--- a/qpid/cpp/rubygen/templates/OperationsInvoker.rb
+++ b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb
@@ -88,5 +88,5 @@ class OperationsInvokerGen < CppGen
end
end
-OperationsInvokerGen.new("client",ARGV[0], Amqp).generate()
-OperationsInvokerGen.new("server",ARGV[0], Amqp).generate()
+OperationsInvokerGen.new("client",ARGV[0], $amqp).generate()
+OperationsInvokerGen.new("server",ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb
index 467476506c..2829884673 100755
--- a/qpid/cpp/rubygen/templates/Proxy.rb
+++ b/qpid/cpp/rubygen/99-0/Proxy.rb
@@ -62,7 +62,9 @@ EOS
include "<sstream>"
include "#{@classname}.h"
include "qpid/framing/amqp_types_full.h"
- Amqp.methods_on(@chassis).each { |m| include "qpid/framing/"+m.body_name }
+ @amqp.methods_on(@chassis).each {
+ |m| include "qpid/framing/"+m.body_name
+ }
genl
namespace("qpid::framing") {
genl "#{@classname}::#{@classname}(FrameHandler& f) :"
@@ -75,6 +77,6 @@ EOS
end
-ProxyGen.new("client", Outdir, Amqp).generate;
-ProxyGen.new("server", Outdir, Amqp).generate;
+ProxyGen.new("client", $outdir, $amqp).generate;
+ProxyGen.new("server", $outdir, $amqp).generate;
diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb
index 6a50fdb462..e01a28a62d 100644
--- a/qpid/cpp/rubygen/templates/Session.rb
+++ b/qpid/cpp/rubygen/99-0/Session.rb
@@ -190,6 +190,6 @@ EOS
end
end
-SessionNoKeywordGen.new(ARGV[0], Amqp).generate()
-SessionGen.new(ARGV[0], Amqp).generate()
+SessionNoKeywordGen.new(ARGV[0], $amqp).generate()
+SessionGen.new(ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/all_method_bodies.rb b/qpid/cpp/rubygen/99-0/all_method_bodies.rb
index d06f459493..5971d49189 100755
--- a/qpid/cpp/rubygen/templates/all_method_bodies.rb
+++ b/qpid/cpp/rubygen/99-0/all_method_bodies.rb
@@ -17,5 +17,5 @@ class AllMethodBodiesGen < CppGen
end
end
-AllMethodBodiesGen.new(Outdir, Amqp).generate();
+AllMethodBodiesGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/constants.rb b/qpid/cpp/rubygen/99-0/constants.rb
index 5fbbefe218..b5f559d504 100755
--- a/qpid/cpp/rubygen/templates/constants.rb
+++ b/qpid/cpp/rubygen/99-0/constants.rb
@@ -78,5 +78,5 @@ class ConstantsGen < CppGen
end
end
-ConstantsGen.new(Outdir, Amqp).generate();
+ConstantsGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/frame_body_lists.rb b/qpid/cpp/rubygen/99-0/frame_body_lists.rb
index 634001ab14..b20e4550f3 100644
--- a/qpid/cpp/rubygen/templates/frame_body_lists.rb
+++ b/qpid/cpp/rubygen/99-0/frame_body_lists.rb
@@ -26,6 +26,6 @@ EOS
end
end
-FrameBodyListsGen.new(ARGV[0], Amqp).generate;
+FrameBodyListsGen.new(ARGV[0], $amqp).generate;
diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb
index edbadb01f6..336591be00 100644
--- a/qpid/cpp/rubygen/templates/structs.rb
+++ b/qpid/cpp/rubygen/99-0/structs.rb
@@ -534,5 +534,5 @@ EOS
end
end
-StructGen.new(ARGV[0], Amqp).generate()
+StructGen.new(ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
index d1212153ca..1fff1d51db 100755
--- a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
+++ b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
@@ -30,5 +30,5 @@ class MethodBodyDefaultVisitorGen < CppGen
end
end
-MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb
index f998909ec8..b1e635a27b 100755
--- a/qpid/cpp/rubygen/amqpgen.rb
+++ b/qpid/cpp/rubygen/amqpgen.rb
@@ -119,7 +119,16 @@ class AmqpElement
# List of children of type elname, or all children if elname
# not specified.
def children(elname=nil)
- @cache_children[elname] ||= @children.select { |c| elname==c.xml.name }
+ if elname
+ @cache_children[elname] ||= @children.select { |c| elname==c.xml.name }
+ else
+ @children
+ end
+ end
+
+ def each_descendant(&block)
+ yield self
+ @children.each { |c| c.each_descendant(&block) }
end
# Look up child of type elname with attribute name.
@@ -127,6 +136,9 @@ class AmqpElement
@cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name }
end
+ # Fully qualified amqp dotted name of this element
+ def dotted_name() (parent ? parent.dotted_name+"." : "") + name; end
+
# The root <amqp> element.
def root() @root ||=parent ? parent.root : self; end
@@ -140,9 +152,22 @@ class AmqpElement
# Text of doc child if there is one.
def doc() d=xml.elements["doc"]; d and d.text; end
+ def fqname()
+ throw "fqname: #{self} #{parent.fqname} has no name" unless name
+ p=parent && parent.fqname
+ p ? p+"."+name : name;
+ end
+
+ def containing_class()
+ return self if is_a? AmqpClass
+ return parent && parent.containing_class
+ end
+
end
-AmqpResponse = AmqpElement
+class AmqpResponse < AmqpElement
+ def initialize(xml, parent) super; end
+end
class AmqpDoc < AmqpElement
def initialize(xml,parent) super; end
@@ -153,18 +178,32 @@ class AmqpChoice < AmqpElement
def initialize(xml,parent) super; end
amqp_attr_reader :name, :value
end
-
+
class AmqpEnum < AmqpElement
def initialize(xml,parent) super; end
amqp_child_reader :choice
end
+# 0-10 array domains are missing element type information, add it here.
+ArrayTypes={
+ "str16-array" => "str-16",
+ "amqp-host-array" => "connection.amqp-host-url",
+ "command-fragments" => "session.command-fragment",
+ "in-doubt" => "dtx.xid"
+}
+
class AmqpDomain < AmqpElement
- def initialize(xml, parent) super; end
+ def initialize(xml, parent)
+ super
+ root.used_by[uses].push(fqname) if uses and uses.index('.')
+ end
+
amqp_attr_reader :type
amqp_single_child_reader :struct # preview
amqp_single_child_reader :enum
+ def uses() type_=="array" ? ArrayTypes[name] : type_; end
+
def unalias()
d=self
while (d.type_ != d.name and root.domain(d.type_))
@@ -180,10 +219,13 @@ class AmqpException < AmqpElement
end
class AmqpField < AmqpElement
- def initialize(xml, amqp) super; end;
+ def initialize(xml, amqp)
+ super;
+ root.used_by[type_].push(parent.fqname) if type_ and type_.index('.')
+ end
+
def domain() root.domain(xml.attributes["domain"]); end
amqp_single_child_reader :struct # preview
- # FIXME aconway 2008-02-21: exceptions in fields - need to update c++ mapping.
amqp_child_reader :exception
amqp_attr_reader :type, :default, :code, :required
end
@@ -198,15 +240,11 @@ class AmqpConstant < AmqpElement
amqp_attr_reader :value, :class
end
-# FIXME aconway 2008-02-21:
-# class AmqpResponse < AmqpElement
-# def initialize(xml, parent) super; end
-# end
-
class AmqpResult < AmqpElement
def initialize(xml, parent) super; end
amqp_single_child_reader :struct # preview
amqp_attr_reader :type
+ def name() "result"; end
end
class AmqpEntry < AmqpElement
@@ -236,8 +274,8 @@ class AmqpStruct < AmqpElement
amqp_attr_reader :size, :code, :pack
amqp_child_reader :field
- alias :raw_pack :pack
# preview - preview code needs default "short" for pack.
+ alias :raw_pack :pack
def pack() raw_pack or (not parent.final? and "short"); end
def result?() parent.xml.name == "result"; end
def domain?() parent.xml.name == "domain"; end
@@ -265,17 +303,21 @@ class AmqpRole < AmqpElement
amqp_attr_reader :implement
end
-class AmqpControl < AmqpElement
+# Base class for command and control.
+class AmqpAction < AmqpElement
def initialize(xml,amqp) super; end
amqp_child_reader :implement, :field, :response
amqp_attr_reader :code
end
-class AmqpCommand < AmqpElement
+class AmqpControl < AmqpAction
+ def initialize(xml,amqp) super; end
+end
+
+class AmqpCommand < AmqpAction
def initialize(xml,amqp) super; end
- amqp_child_reader :implement, :field, :exception, :response
+ amqp_child_reader :exception
amqp_single_child_reader :result, :segments
- amqp_attr_reader :code
end
class AmqpClass < AmqpElement
@@ -296,12 +338,19 @@ class AmqpClass < AmqpElement
def l4?() # preview
!["connection", "session", "execution"].include?(name)
end
+
+ def actions() controls+commands; end
end
class AmqpType < AmqpElement
+ def initialize(xml,amqp) super; end
amqp_attr_reader :code, :fixed_width, :variable_width
end
+class AmqpXref < AmqpElement
+ def initialize(xml,amqp) super; end
+end
+
# AMQP root element.
class AmqpRoot < AmqpElement
amqp_attr_reader :major, :minor, :port, :comment
@@ -314,14 +363,16 @@ class AmqpRoot < AmqpElement
raise "No XML spec files." if specs.empty?
xml=parse(specs.shift)
specs.each { |s| xml_merge(xml, parse(s)) }
+ @used_by=Hash.new{ |h,k| h[k]=[] }
super(xml, nil)
end
+ attr_reader :used_by
+
+ def merge(root) xml_merge(xml, root.xml); end
+
def version() major + "-" + minor; end
- # Find a child node from a dotted amqp name, e.g. message.transfer
- def lookup(dotted_name) elements[dotted_name.gsub(/\./,"/")]; end
-
# preview - only struct child reader remains for new mapping
def domain_structs() domains.map{ |d| d.struct }.compact; end
def result_structs()
@@ -337,6 +388,8 @@ class AmqpRoot < AmqpElement
@methods_on[chassis] ||= classes.map { |c| c.methods_on(chassis) }.flatten
end
+ def fqname() nil; end
+
# TODO aconway 2008-02-21: methods by role.
private
@@ -353,7 +406,7 @@ end
# Collect information about generated files.
class GenFiles
@@files =[]
- def GenFiles.add(f) @@files << f; puts f; end
+ def GenFiles.add(f) @@files << f; end
def GenFiles.get() @@files; end
end
@@ -366,26 +419,38 @@ class Generator
def initialize (outdir, amqp)
@amqp=amqp
@outdir=outdir
- @prefix='' # For indentation or comments.
+ @prefix=[''] # For indentation or comments.
@indentstr=' ' # One indent level.
@outdent=2
- Pathname.new(@outdir).mkpath unless @outdir=="-" or File.directory?(@outdir)
+ Pathname.new(@outdir).mkpath unless @outdir=="-"
end
# Create a new file, set @out.
- def file(file)
+ def file(file, &block)
GenFiles.add file
if (@outdir != "-")
- path=Pathname.new "#{@outdir}/#{file}"
- path.parent.mkpath
- path.open('w') { |@out| yield }
+ @path=Pathname.new "#{@outdir}/#{file}"
+ @path.parent.mkpath
+ @out=String.new # Generate in memory first
+ if block then yield; endfile; end
+ end
+ end
+
+ def endfile()
+ if @outdir != "-"
+ if @path.exist? and @path.read == @out
+ puts "Skipped #{@path} - unchanged" # Dont generate if unchanged
+ else
+ @path.open('w') { |f| f << @out }
+ puts "Generated #{@path}"
+ end
end
end
# Append multi-line string to generated code, prefixing each line.
def gen (str)
str.each_line { |line|
- @out << @prefix unless @midline
+ @out << @prefix.last unless @midline
@out << line
@midline = nil
}
@@ -400,23 +465,25 @@ class Generator
end
# Generate code with added prefix.
- def prefix(add)
- save=@prefix
- @prefix+=add
- yield
- @prefix=save
+ def prefix(add, &block)
+ @prefix.push @prefix.last+add
+ if block then yield; endprefix; end
+ end
+
+ def endprefix()
+ @prefix.pop
end
# Generate indented code
def indent(n=1,&block) prefix(@indentstr * n,&block); end
+ alias :endindent :endprefix
# Generate outdented code
def outdent(&block)
- save=@prefix
- @prefix=@prefix[0...-2]
- yield
- @prefix=save
+ @prefix.push @prefix.last[0...-2]
+ if block then yield; endprefix; end
end
+ alias :endoutdent :endprefix
attr_accessor :out
end
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb
index 60a653e18d..edee72e0bd 100755
--- a/qpid/cpp/rubygen/cppgen.rb
+++ b/qpid/cpp/rubygen/cppgen.rb
@@ -54,12 +54,31 @@ CppKeywords = Set.new(["and", "and_eq", "asm", "auto", "bitand",
CppMangle = CppKeywords+Set.new(["string"])
class String
- def cppsafe()
- CppMangle.include?(self) ? self+"_" : self
+ def cppsafe() CppMangle.include?(self) ? self+"_" : self; end
+
+ def amqp2cpp()
+ path=split(".")
+ name=path.pop
+ return name.typename if path.empty?
+ path.map! { |n| n.nsname }
+ return (path << name.caps).join("::")
end
+
+ alias :typename :caps
+ alias :nsname :bars
+ alias :constname :shout
+ alias :funcname :lcaps
+ alias :varname :lcaps
end
# Hold information about a C++ type.
+#
+# preview - new mapping does not use CppType,
+# Each amqp type corresponds exactly by dotted name
+# to a type, domain or struct, which in turns
+# corresponds by name to a C++ type or typedef.
+# (see String.amqp2cpp)
+#
class CppType
def initialize(name) @name=@param=@ret=name; end
attr_reader :name, :param, :ret, :code
@@ -93,11 +112,22 @@ class CppType
def to_s() name; end;
end
+class AmqpElement
+ def cppfqname()
+ names=parent.dotted_name.split(".")
+ # Field children are moved up to method in C++b
+ prefix.pop if parent.is_a? AmqpField
+ prefix.push cppname
+ prefix.join("::")
+ end
+end
+
class AmqpField
def cppname() name.lcaps.cppsafe; end
def cpptype() domain.cpptype; end
def bit?() domain.type_ == "bit"; end
def signature() cpptype.param+" "+cppname; end
+ def paramtype() "call_traits<#{type_.amqp2cpp}>::param_type"; end
end
class AmqpMethod
@@ -107,8 +137,41 @@ class AmqpMethod
def body_name() parent.name.caps+name.caps+"Body"; end
end
+module AmqpHasFields
+ def parameters()
+ fields.map { |f| "#{f.paramtype} #{f.cppname}_"}.join(",\n")
+ end
+
+ def arguments()
+ fields.map { |f| "#{f.cppname}_"}.join(",\n")
+ end
+
+ def values()
+ fields.map { |f| "#{f.cppname}"}.join(",\n")
+ end
+
+ def initializers()
+ fields.map { |f| "#{f.cppname}(#{f.cppname}_)"}.join(",\n")
+ end
+end
+
+class AmqpAction
+ def classname() name.typename; end
+ def funcname() parent.name.funcname + name.caps; end
+ include AmqpHasFields
+end
+
+class AmqpCommand < AmqpAction
+ def base() "Command"; end
+end
+
+class AmqpControl < AmqpAction
+ def base() "Control"; end
+end
+
class AmqpClass
- def cppname() name.caps; end
+ def cppname() name.caps; end # preview
+ def nsname() name.nsname; end
end
class AmqpDomain
@@ -150,18 +213,26 @@ class AmqpResult
end
class AmqpStruct
- def cpp_pack_type() AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); end
- def cpptype() parent.cpptype; end
- def cppname() cpptype.name; end
+ include AmqpHasFields
+
+ def cpp_pack_type() # preview
+ AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t");
+ end
+ def cpptype() parent.cpptype; end # preview
+ def cppname() cpptype.name; end # preview
+
+ def classname() name.typename; end
end
class CppGen < Generator
def initialize(outdir, *specs)
super(outdir,*specs)
+ # need to sort classes for dependencies
+ @actions=[] # Stack of end-scope actions
end
# Write a header file.
- def h_file(path)
+ def h_file(path, &block)
path = (/\.h$/ === path ? path : path+".h")
guard=path.upcase.tr('./-','_')
file(path) {
@@ -169,12 +240,12 @@ class CppGen < Generator
gen "#define #{guard}\n"
gen Copyright
yield
- gen "#endif /*!#{guard}*/\n"
+ gen "#endif /*!#{guard}*/\n"
}
end
# Write a .cpp file.
- def cpp_file(path)
+ def cpp_file(path, &block)
path = (/\.cpp$/ === path ? path : path+".cpp")
file(path) do
gen Copyright
@@ -188,10 +259,12 @@ class CppGen < Generator
genl "#include #{header}"
end
- def scope(open="{",close="}", &block)
- genl open; indent(&block); genl close
+ def scope(open="{",close="}", &block)
+ genl open
+ indent &block
+ genl close
end
-
+
def namespace(name, &block)
genl
names = name.split("::")
@@ -210,7 +283,7 @@ class CppGen < Generator
indent { gen "#{bases.join(",\n")}" }
end
genl
- scope("{","};") { yield }
+ scope("{","};", &block)
end
def struct(name, *bases, &block)
@@ -242,6 +315,11 @@ class CppGen < Generator
prefix(" * ",&block)
genl " */"
end
+
+ # Generate code in namespace for each class
+ def each_class_ns()
+ @amqp.classes.each { |c| namespace(c.nsname) { yield c } }
+ end
end
# Fully-qualified class name
diff --git a/qpid/cpp/rubygen/generate b/qpid/cpp/rubygen/generate
index 94b194aaa4..d094be4f41 100755
--- a/qpid/cpp/rubygen/generate
+++ b/qpid/cpp/rubygen/generate
@@ -1,5 +1,6 @@
#!/usr/bin/env ruby
require 'amqpgen'
+require 'pathname'
#
# Run a set of code generation templates.
@@ -18,17 +19,39 @@ EOS
exit 1
end
-Outdir=ARGV[0]
-Specs=ARGV.grep(/\.xml$/)
-Amqp=AmqpRoot.new(*Specs)
+# Create array of specs by version
+def parse_specs(specs)
+ roots={ }
+ specs.each { |spec|
+ root=AmqpRoot.new(spec)
+ ver=root.version
+ if (roots[ver])
+ roots[ver].merge(root)
+ else
+ roots[ver]=root
+ end
+ }
+ roots
+end
# Run selected templates
if ARGV.any? { |arg| arg=="all" }
- templates=Dir["#{File.dirname __FILE__}/templates/*.rb"]
+ templates=Dir["#{File.dirname __FILE__}/*/*.rb"]
else
templates=ARGV.grep(/\.rb$/)
end
-templates.each { |t| load t }
+
+$outdir=ARGV[0]
+$models=parse_specs(ARGV.grep(/\.xml$/))
+templates.each { |t|
+ ver=Pathname.new(t).dirname.basename.to_s
+ $amqp=$models[ver]
+ if $amqp
+ load t
+ else
+ puts "Warning: skipping #{t}, no spec file for version #{ver}."
+ end
+}
def make_continue(lines) lines.join(" \\\n "); end
@@ -40,7 +63,7 @@ if makefile
generator_files=Dir["**/*.rb"] << File.basename(__FILE__)
Dir.chdir dir
rgen_generator=generator_files.map{ |f| "$(rgen_dir)/#{f}" }
- rgen_srcs=GenFiles.get.map{ |f| "#{Outdir}/#{f}" }
+ rgen_srcs=GenFiles.get.map{ |f| "#{$outdir}/#{f}" }
File.open(makefile, 'w') { |out|
out << <<EOS
@@ -51,13 +74,13 @@ rgen_generator=#{make_continue rgen_generator}
rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))}
-rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))}
+rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r{qpid/(framing|amqp_.+)/.+\.cpp$}))}
rgen_srcs=#{make_continue rgen_srcs}
# Header file install rules.
EOS
- ["framing", "client/no_keyword","client", "broker"].each { |ns|
+ ["amqp_0_10", "framing", "client/no_keyword","client", "broker"].each { |ns|
dir="qpid/#{ns}"
dir_ = dir.tr("/", "_")
regex=%r|#{dir}/[^/]+\.h$|
@@ -69,7 +92,7 @@ EOS
}
out << <<EOS
if GENERATE
-$(rgen_srcs) $(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs)
+$(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs)
$(rgen_cmd)
# Empty rule in case a generator file is renamed/removed.
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index b44aaa7e5e..e3b95e045f 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -11,11 +11,13 @@ EXTRA_DIST= $(platform_dist)
# This phony target is needed by generated makefile fragments:
force:
-# AMQP_XML is defined in ../configure.ac
-specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml
-
if GENERATE
+# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
+amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml
+amqp_0_10_xml=@AMQP_FINAL_XML@
+specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
+
# Ruby generator.
rgen_dir=$(top_srcdir)/rubygen
rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk
@@ -101,6 +103,7 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(rgen_common_cpp) \
$(platform_src) \
+ qpid/amqp_0_10/helpers.cpp \
qpid/Serializer.h \
qpid/amqp_0_10/built_in_types.h \
qpid/amqp_0_10/Codec.h \
@@ -166,6 +169,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PreviewConnection.cpp \
qpid/broker/PreviewConnectionHandler.cpp \
qpid/broker/PreviewSessionHandler.cpp \
+ qpid/broker/PreviewSessionManager.cpp \
+ qpid/broker/PreviewSessionState.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -247,6 +252,7 @@ libqpidclient_la_SOURCES = \
nobase_include_HEADERS = \
$(platform_hdr) \
+ qpid/amqp_0_10/helpers.h \
qpid/assert.h \
qpid/DataDir.h \
qpid/Exception.h \
@@ -270,6 +276,8 @@ nobase_include_HEADERS = \
qpid/broker/PreviewConnection.h \
qpid/broker/PreviewConnectionHandler.h \
qpid/broker/PreviewSessionHandler.h \
+ qpid/broker/PreviewSessionManager.h \
+ qpid/broker/PreviewSessionState.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp
index 07f157cfc3..17f0d5029c 100644
--- a/qpid/cpp/src/qpid/Exception.cpp
+++ b/qpid/cpp/src/qpid/Exception.cpp
@@ -32,20 +32,31 @@ std::string strError(int err) {
return std::string(strerror_r(err, buf, sizeof(buf)));
}
-Exception::Exception(const std::string& s) throw() : msg(s) {
- QPID_LOG(warning, "Exception: " << msg);
+Exception::Exception(const std::string& msg,
+ const std::string& nm,
+ uint16_t cd) throw()
+ : message(msg), name(nm), code(cd),
+ whatStr((name.empty() ? "" : name + ": ")+ msg)
+{
+ QPID_LOG(warning, "Exception: " << whatStr);
}
Exception::~Exception() throw() {}
-std::string Exception::str() const throw() {
- if (msg.empty())
- const_cast<std::string&>(msg).assign(typeid(*this).name());
- return msg;
+std::string Exception::getMessage() const throw() { return message; }
+
+std::string Exception::getName() const throw() {
+ return name.empty() ? typeid(*this).name() : name;
}
-const char* Exception::what() const throw() { return str().c_str(); }
+uint16_t Exception::getCode() const throw() { return code; }
+
+const char* Exception::what() const throw() {
+ if (whatStr.empty()) return typeid(*this).name();
+ else return whatStr.c_str();
+}
-const std::string ClosedException::CLOSED_MESSAGE("Closed");
+ClosedException::ClosedException(const std::string& msg)
+ : Exception(msg, "ClosedException") {}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h
index 57c7a21234..00365018ba 100644
--- a/qpid/cpp/src/qpid/Exception.h
+++ b/qpid/cpp/src/qpid/Exception.h
@@ -40,13 +40,27 @@ std::string strError(int err);
class Exception : public std::exception
{
public:
- explicit Exception(const std::string& str=std::string()) throw();
+ explicit Exception(const std::string& message=std::string(),
+ const std::string& name=std::string(),
+ uint16_t code=0) throw();
+
virtual ~Exception() throw();
+
+ // returns "name: message"
+ virtual const char* what() const throw();
+
+ virtual std::string getName() const throw();
+ virtual std::string getMessage() const throw();
+ virtual uint16_t getCode() const throw();
+
+ // FIXME aconway 2008-02-21: backwards compat, remove?
+ std::string str() const throw() { return getMessage(); }
- virtual const char *what() const throw();
- virtual std::string str() const throw();
private:
- std::string msg;
+ const std::string message;
+ const std::string name;
+ const uint16_t code;
+ const std::string whatStr;
};
struct ChannelException : public Exception {
@@ -62,8 +76,7 @@ struct ConnectionException : public Exception {
};
struct ClosedException : public Exception {
- static const std::string CLOSED_MESSAGE;
- ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {}
+ ClosedException(const std::string& msg=std::string());
};
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
index 6cd9c72367..445f07459c 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
@@ -29,6 +29,7 @@
#include <boost/array.hpp>
#include <stdint.h>
#include <string>
+#include <vector>
/**@file Mapping from built-in AMQP types to C++ types */
@@ -66,7 +67,7 @@ typedef double Double;
typedef float Float;
typedef framing::SequenceNumber SequenceNo;
using framing::Uuid;
-typedef sys::AbsTime DateTime;
+typedef sys::AbsTime Datetime;
typedef Decimal<Uint8, Int32> Dec32;
typedef Decimal<Uint8, Int64> Dec64;
@@ -89,14 +90,18 @@ typedef CodableString<Uint16, Uint16> Str16Utf16;
typedef CodableString<Uint8, Uint32> Vbin32;
-/** FIXME aconway 2008-02-20: todo
-byte-ranges 2 byte ranges within a 64-bit payload
-sequence-set 2 ranged set representation
-map 0xa8 4 a mapping of keys to typed values
-list 0xa9 4 a series of consecutive type-value pairs
-array 0xaa 4 a defined length collection of values of a single type
-struct32 0xab 4 a coded struct with a 32-bit size
-*/
+// FIXME aconway 2008-02-26: array encoding
+template <class T> struct Array : public std::vector<T> {};
+
+// FIXME aconway 2008-02-26: Unimplemented types:
+struct ByteRanges {};
+struct SequenceSet {};
+struct Map {};
+struct List {};
+struct Struct32 {};
+
+// Top level enum definitions.
+enum SegmentType { CONTROL, COMMAND, HEADER, BODY };
}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp
new file mode 100644
index 0000000000..4333a2cd92
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "helpers.h"
+
+namespace qpid {
+namespace amqp_0_10 {
+
+Control::~Control() {}
+Command::~Command() {}
+Struct::~Struct() {}
+
+}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.h b/qpid/cpp/src/qpid/amqp_0_10/helpers.h
new file mode 100644
index 0000000000..1769d374d9
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.h
@@ -0,0 +1,67 @@
+#ifndef QPID_AMQP_0_10_HELPERS_H
+#define QPID_AMQP_0_10_HELPERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+n * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+
+namespace amqp_0_10 {
+
+// Look up names by code
+const char* getClassName(uint8_t code);
+const char* getCommandName(uint8_t classCode, uint8_t code);
+const char* getControlName(uint8_t classCode, uint8_t code);
+const char* getStructName(uint8_t classCode, uint8_t code);
+
+struct Command {
+ virtual ~Command();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+struct Control {
+ virtual ~Control();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+struct Struct {
+ virtual ~Struct();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+/** Base class for generated enum domains.
+ * Enums map to classes for type safety and to provide separate namespaces
+ * for clashing values.
+ */
+struct Enum {
+ int value;
+ Enum(int v=0) : value(v) {}
+ operator int() const { return value; }
+ template <class S> void serialize(S &s) { s(value); }
+};
+
+}} // namespace qpid::amqp_0_10
+
+#endif /*!QPID_AMQP_0_10_HELPERS_H*/
diff --git a/qpid/cpp/src/qpid/amqp_0_10/visitors.h b/qpid/cpp/src/qpid/amqp_0_10/visitors.h
new file mode 100644
index 0000000000..3835f37f3e
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/visitors.h
@@ -0,0 +1,15 @@
+// Visitors
+template <class Base> struct Visitor;
+template <class Base, class F, class R> FunctorVisitor;
+
+/** Template base implementation for visitables. */
+template <class Base, class Derived>
+struct VisitableBase : public Base {
+ virtual void accept(Visitor<Derived>& v) {
+ v.visit(static_cast<Derived>&(*this));
+ }
+ virtual void accept(Visitor<Derived>& v) const {
+ v.visit(static_cast<const Derived>&(*this));
+ }
+};
+
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 0a0eb0a0df..9bfa868d9c 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) :
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
- sessionManager(conf.ack)
+ sessionManager(conf.ack),
+ previewSessionManager(conf.ack)
{
// Early-Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 55bc7644a5..153eabc6b3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -30,6 +30,7 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
+#include "PreviewSessionManager.h"
#include "Vhost.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir& getDataDir() { return dataDir; }
SessionManager& getSessionManager() { return sessionManager; }
+ PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
+ PreviewSessionManager previewSessionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
index c8c1e12f28..ef2c51bb8d 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
@@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- framing::ProtocolVersion getVersion() const { return session.getVersion();}
+ framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
AccessHandler* getAccessHandler() {
diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h
index 410d400c9d..4c51e2a826 100644
--- a/qpid/cpp/src/qpid/broker/HandlerImpl.h
+++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h
@@ -20,7 +20,7 @@
*/
#include "SemanticState.h"
-#include "SessionState.h"
+#include "SessionContext.h"
#include "ConnectionState.h"
namespace qpid {
@@ -35,13 +35,13 @@ class Broker;
class HandlerImpl {
protected:
SemanticState& state;
- SessionState& session;
+ SessionContext& session;
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
ConnectionState& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
index 19e6a235c4..36092bb7f6 100644
--- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
@@ -19,7 +19,7 @@
*/
#include "PreviewSessionHandler.h"
-#include "SessionState.h"
+#include "PreviewSessionState.h"
#include "PreviewConnection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
@@ -36,7 +36,7 @@ using namespace std;
using namespace qpid::sys;
PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const {
void PreviewSessionHandler::open(uint32_t detachedLifetime) {
assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
peerSession.attached(session->getId(), session->getTimeout());
}
void PreviewSessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
+ session = connection.broker.getPreviewSessionManager().resume(id);
session->attach(*this);
SequenceNumber seq = session->resuming();
peerSession.attached(session->getId(), session->getTimeout());
@@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText)
void PreviewSessionHandler::localSuspend() {
if (session.get() && session->isAttached()) {
session->detach();
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
}
@@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
+ if (session->getState() == PreviewSessionState::RESUMING) {
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
@@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() {
void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
}
void PreviewSessionHandler::detached()
{
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
index e1096ebf9f..4c517367d7 100644
--- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
@@ -36,7 +36,7 @@ namespace qpid {
namespace broker {
class PreviewConnection;
-class SessionState;
+class PreviewSessionState;
/**
* A SessionHandler is associated with each active channel. It
@@ -45,7 +45,7 @@ class SessionState;
*/
class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand
~PreviewSessionHandler();
/** Returns 0 if not attached to a session */
- SessionState* getSession() { return session.get(); }
- const SessionState* getSession() const { return session.get(); }
+ PreviewSessionState* getSession() { return session.get(); }
+ const PreviewSessionState* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel.get(); }
@@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
- std::auto_ptr<SessionState> session;
+ std::auto_ptr<PreviewSessionState> session;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
new file mode 100644
index 0000000000..ec73082817
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionManager.h"
+#include "PreviewSessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/memory.h"
+
+#include <boost/bind.hpp>
+#include <boost/range.hpp>
+
+#include <algorithm>
+#include <functional>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+using namespace sys;
+using namespace framing;
+
+PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
+
+PreviewSessionManager::~PreviewSessionManager() {}
+
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::open(
+ PreviewSessionHandler& h, uint32_t timeout_)
+{
+ Mutex::ScopedLock l(lock);
+ std::auto_ptr<PreviewSessionState> session(
+ new PreviewSessionState(this, &h, timeout_, ack));
+ active.insert(session->getId());
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::opened, _1,boost::ref(*session)));
+ return session;
+}
+
+void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
+ Mutex::ScopedLock l(lock);
+ active.erase(session->getId());
+ session->suspend();
+ session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ if (session->mgmtObject.get() != 0)
+ session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
+ suspended.push_back(session.release()); // In expiry order
+ eraseExpired();
+}
+
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ eraseExpired();
+ if (active.find(id) != active.end())
+ throw SessionBusyException(
+ QPID_MSG("Session already active: " << id));
+ Suspended::iterator i = std::find_if(
+ suspended.begin(), suspended.end(),
+ boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
+ );
+ if (i == suspended.end())
+ throw InvalidArgumentException(
+ QPID_MSG("No suspended session with id=" << id));
+ active.insert(id);
+ std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
+ return state;
+}
+
+void PreviewSessionManager::erase(const framing::Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ active.erase(id);
+}
+
+void PreviewSessionManager::eraseExpired() {
+ // Called with lock held.
+ if (!suspended.empty()) {
+ Suspended::iterator keep = std::lower_bound(
+ suspended.begin(), suspended.end(), now(),
+ boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
+ }
+}
+
+void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) {
+ observers.push_back(o);
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.h b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
new file mode 100644
index 0000000000..65ca49ec89
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
+#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/Uuid.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <set>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+class PreviewSessionState;
+class PreviewSessionHandler;
+
+/**
+ * Create and manage PreviewSessionState objects.
+ */
+class PreviewSessionManager : private boost::noncopyable {
+ public:
+ /**
+ * Observer notified of PreviewSessionManager events.
+ */
+ struct Observer : public RefCounted {
+ virtual void opened(PreviewSessionState&) {}
+ };
+
+ PreviewSessionManager(uint32_t ack);
+
+ ~PreviewSessionManager();
+
+ /** Open a new active session, caller takes ownership */
+ std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
+
+ /** Suspend a session, start it's timeout counter.
+ * The factory takes ownership.
+ */
+ void suspend(std::auto_ptr<PreviewSessionState> session);
+
+ /** Resume a suspended session.
+ *@throw Exception if timed out or non-existant.
+ */
+ std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
+
+ /** Add an Observer. */
+ void add(const intrusive_ptr<Observer>&);
+
+ private:
+ typedef boost::ptr_vector<PreviewSessionState> Suspended;
+ typedef std::set<framing::Uuid> Active;
+ typedef std::vector<intrusive_ptr<Observer> > Observers;
+
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
+ sys::Mutex lock;
+ Suspended suspended;
+ Active active;
+ uint32_t ack;
+ Observers observers;
+
+ friend class PreviewSessionState; // removes deleted sessions from active set.
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+
+
+#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
new file mode 100644
index 0000000000..7188ffbf40
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewSessionState.h"
+#include "PreviewSessionManager.h"
+#include "PreviewSessionHandler.h"
+#include "ConnectionState.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+PreviewSessionState::PreviewSessionState(
+ PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack, timeout_ > 0),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
+ getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Session::shared_ptr
+ (new management::Session (this, parent, id.str ()));
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
+ mgmtObject->set_detachedLifespan (getTimeout());
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+PreviewSessionState::~PreviewSessionState() {
+ // Remove ID from active session list.
+ if (factory)
+ factory->erase(getId());
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+PreviewSessionHandler* PreviewSessionState::getHandler() {
+ return handler;
+}
+
+AMQP_ClientProxy& PreviewSessionState::getProxy() {
+ assert(isAttached());
+ return getHandler()->getProxy();
+}
+
+ConnectionState& PreviewSessionState::getConnection() {
+ assert(isAttached());
+ return getHandler()->getConnection();
+}
+
+void PreviewSessionState::detach() {
+ getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ Mutex::ScopedLock l(lock);
+ handler = 0; out.next = 0;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (0);
+ }
+}
+
+void PreviewSessionState::attach(PreviewSessionHandler& h) {
+ {
+ Mutex::ScopedLock l(lock);
+ handler = &h;
+ out.next = &handler->out;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
+ }
+ h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
+
+void PreviewSessionState::activateOutput()
+{
+ Mutex::ScopedLock l(lock);
+ if (isAttached()) {
+ getConnection().outputTasks.activateOutput();
+ }
+}
+ //This class could be used as the callback for queue notifications
+ //if not attached, it can simply ignore the callback, else pass it
+ //on to the connection
+
+ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case management::Session::METHOD_DETACH :
+ if (handler != 0)
+ {
+ handler->detach();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_CLOSE :
+ /*
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+ */
+
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.h b/qpid/cpp/src/qpid/broker/PreviewSessionState.h
new file mode 100644
index 0000000000..6e8523317c
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.h
@@ -0,0 +1,124 @@
+#ifndef QPID_BROKER_PREVIEWSESSION_H
+#define QPID_BROKER_PREVIEWSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <set>
+#include <vector>
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class SemanticHandler;
+class PreviewSessionHandler;
+class PreviewSessionManager;
+class Broker;
+class ConnectionState;
+
+/**
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
+ */
+class PreviewSessionState : public framing::SessionState,
+ public SessionContext,
+ public framing::FrameHandler::Chains,
+ public management::Manageable
+{
+ public:
+ ~PreviewSessionState();
+ bool isAttached() { return handler; }
+
+ void detach();
+ void attach(PreviewSessionHandler& handler);
+
+
+ PreviewSessionHandler* getHandler();
+
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ ConnectionState& getConnection();
+
+ uint32_t getTimeout() const { return timeout; }
+ Broker& getBroker() { return broker; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ /** OutputControl **/
+ void activateOutput();
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
+ // Normally SessionManager creates sessions.
+ PreviewSessionState(PreviewSessionManager*,
+ PreviewSessionHandler* out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
+
+ private:
+ PreviewSessionManager* factory;
+ PreviewSessionHandler* handler;
+ framing::Uuid id;
+ uint32_t timeout;
+ sys::AbsTime expiry; // Used by SessionManager.
+ Broker& broker;
+ framing::ProtocolVersion version;
+ sys::Mutex lock;
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
+ management::Session::shared_ptr mgmtObject;
+
+ friend class PreviewSessionManager;
+};
+
+
+inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
+ return out << session.getId();
+}
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index 2a79496144..fdde7ec18c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,7 +22,6 @@
#include "SemanticHandler.h"
#include "SemanticState.h"
#include "SessionContext.h"
-#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -37,9 +36,9 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(SessionState& s) :
+SemanticHandler::SemanticHandler(SessionContext& s) :
state(*this,s), session(s),
- msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
+ msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
{}
@@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- SessionContext* handler = session.getHandler();
- if (handler) {
- uint32_t maxFrameSize = handler->getConnection().getFrameMax();
- MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
- } else {
- QPID_LOG(error, "Dropping message as session is no longer attached to a channel.");
- }
+ uint32_t maxFrameSize = session.getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
return outgoing.hwm;
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h
index d7f3ec8799..893a0cbded 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.h
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h
@@ -46,7 +46,7 @@ class AMQHeaderBody;
namespace broker {
-class SessionState;
+class SessionContext;
class SemanticHandler : public DeliveryAdapter,
public framing::FrameHandler,
@@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter,
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
SemanticState state;
- SessionState& session;
+ SessionContext& session;
// TODO aconway 2007-09-20: Why are these on the handler rather than the
// state?
IncomingExecutionContext incoming;
@@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter,
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
//Connection& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
public:
- SemanticHandler(SessionState& session);
+ SemanticHandler(SessionContext& session);
//frame handler:
void handle(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 7b4035604f..9b44f31e14 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "SessionState.h"
+#include "SessionContext.h"
#include "BrokerAdapter.h"
#include "Queue.h"
#include "Connection.h"
@@ -56,7 +56,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::ptr_map;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
deliveryAdapter(da),
prefetchSize(0),
@@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (parent->getSession().isAttached() && accept(msg.payload)) {
- allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
- return true;
- } else {
- QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
- return false;
+ allocateCredit(msg.payload);
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter.deliver(msg, token);
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg.payload);
}
+ return true;
}
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
@@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c)
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(getSession().getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue);
}
}
}
@@ -352,7 +347,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 7fc6e4167c..cc9c0e1e9b 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -45,7 +45,7 @@
namespace qpid {
namespace broker {
-class SessionState;
+class SessionContext;
/**
* SemanticState holds the L3 and L4 state of an open session, whether
@@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains,
typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
- SessionState& session;
+ SessionContext& session;
DeliveryAdapter& deliveryAdapter;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
@@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains,
void cancel(ConsumerImpl&);
public:
- SemanticState(DeliveryAdapter&, SessionState&);
+ SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
- SessionState& getSession() { return session; }
+ SessionContext& getSession() { return session; }
/**
* Get named queue, never returns 0.
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index a27b43cf65..a289310b15 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -25,6 +25,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
@@ -33,17 +34,13 @@
namespace qpid {
namespace broker {
-class SessionContext : public framing::FrameHandler::InOutHandler
+class SessionContext : public sys::OutputControl
{
public:
- SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
virtual ~SessionContext(){}
virtual ConnectionState& getConnection() = 0;
- virtual const ConnectionState& getConnection() const = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
- virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
- virtual void detach() = 0;
- virtual framing::ChannelId getChannel() const = 0;
+ virtual Broker& getBroker() = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 1cb10d0c19..0e3c9928d1 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -21,6 +21,7 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
@@ -36,7 +37,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
+ if (!ignoring) {
+ if (m &&
+ (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
+ invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
}
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
@@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) {
}
void SessionHandler::assertAttached(const char* method) const {
- if (!session.get())
+ if (!session.get()) {
+ std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
+ }
}
void SessionHandler::assertClosed(const char* method) const {
@@ -208,4 +215,32 @@ void SessionHandler::detached()
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
+void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ assertAttached("complete");
+ session->complete(cumulative, range);
+}
+
+void SessionHandler::flush()
+{
+ assertAttached("flush");
+ session->flush();
+}
+void SessionHandler::sync()
+{
+ assertAttached("sync");
+ session->sync();
+}
+
+void SessionHandler::noop()
+{
+ assertAttached("noop");
+ session->noop();
+}
+
+void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 5a72bfb12d..e6bc463a82 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -28,7 +28,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/noncopyable.hpp>
@@ -36,16 +36,18 @@ namespace qpid {
namespace broker {
class Connection;
+class ConnectionState;
class SessionState;
/**
* A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
+ * receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::AMQP_ServerOperations::ExecutionHandler,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
void detached();
+ //Execution methods:
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
-
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp
index aa7ac9a8bb..571d3365db 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp
@@ -45,7 +45,7 @@ SessionManager::~SessionManager() {}
// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
- SessionContext& h, uint32_t timeout_)
+ SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h
index 94956a83ed..7e8bd18f57 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.h
+++ b/qpid/cpp/src/qpid/broker/SessionManager.h
@@ -38,7 +38,7 @@ namespace qpid {
namespace broker {
class SessionState;
-class SessionContext;
+class SessionHandler;
/**
* Create and manage SessionState objects.
@@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable {
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index b6c59cfb3b..573a567da6 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -19,12 +19,16 @@
*
*/
#include "SessionState.h"
-#include "SessionManager.h"
-#include "SessionContext.h"
-#include "ConnectionState.h"
#include "Broker.h"
+#include "ConnectionState.h"
+#include "MessageDelivery.h"
#include "SemanticHandler.h"
+#include "SessionManager.h"
+#include "SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
@@ -37,17 +41,17 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
+ semanticState(*this, *this),
+ adapter(semanticState),
+ msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+ ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.addOutputTask(&semanticState);
Manageable* parent = broker.GetVhostObject ();
@@ -76,7 +80,7 @@ SessionState::~SessionState() {
mgmtObject->resourceDestroy ();
}
-SessionContext* SessionState::getHandler() {
+SessionHandler* SessionState::getHandler() {
return handler;
}
@@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() {
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
+ handler = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
}
}
-void SessionState::attach(SessionContext& h) {
+void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
- out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
@@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ h.getConnection().outputTasks.addOutputTask(&semanticState);
}
void SessionState::activateOutput()
@@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
+void SessionState::handleCommand(framing::AMQMethodBody* method)
+{
+ SequenceNumber id = incoming.next();
+ Invoker::Result invocation = invoke(adapter, *method);
+ incoming.complete(id);
+
+ if (!invocation.wasHandled()) {
+ throw NotImplementedException("Not implemented");
+ } else if (invocation.hasResult()) {
+ getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ }
+ if (method->isSync()) {
+ incoming.sync(id);
+ sendCompletion();
+ }
+ //TODO: if window gets too large send unsolicited completion
+}
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(incoming.next());
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&getConnection());
+ semanticState.handle(msg);
+ msgBuilder.end();
+ incoming.track(msg);
+ if (msg->getFrames().getMethod()->isSync()) {
+ incoming.sync(msg->getCommandId());
+ sendCompletion();
+ }
+ }
+}
+
+void SessionState::handle(AMQFrame& frame)
+{
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assmblies are non-content bearing and all content-bearing
+ //assmeblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod());
+ } else {
+ handleContent(frame);
+ }
+
+}
+
+DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+{
+ uint32_t maxFrameSize = getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
+ return outgoing.hwm;
+}
+
+void SessionState::sendCompletion()
+{
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
+ getProxy().getExecution().complete(mark.getValue(), range);
+}
+
+void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ semanticState.ackCumulative(mark.getValue());
+ }
+ range.processRanges(ackOp);
+}
+
+void SessionState::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
+
+void SessionState::sync()
+{
+ incoming.sync();
+ sendCompletion();
+}
+
+void SessionState::noop()
+{
+ incoming.noop();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 8a12e580b7..98c21a8ab5 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -27,10 +27,15 @@
#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
+#include "BrokerAdapter.h"
+#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
+#include "SessionContext.h"
+#include "SemanticState.h"
+#include "IncomingExecutionContext.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -47,8 +52,7 @@ class AMQP_ClientProxy;
namespace broker {
-class SemanticHandler;
-class SessionContext;
+class SessionHandler;
class SessionManager;
class Broker;
class ConnectionState;
@@ -58,8 +62,8 @@ class ConnectionState;
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::Chains,
- public sys::OutputControl,
+ public SessionContext,
+ public DeliveryAdapter,
public management::Manageable
{
public:
@@ -67,10 +71,10 @@ class SessionState : public framing::SessionState,
bool isAttached() { return handler; }
void detach();
- void attach(SessionContext& handler);
+ void attach(SessionHandler& handler);
- SessionContext* getHandler();
+ SessionHandler* getHandler();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
@@ -85,6 +89,19 @@ class SessionState : public framing::SessionState,
/** OutputControl **/
void activateOutput();
+ void handle(framing::AMQFrame& frame);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
+
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void sync();
+ void sendCompletion();
+
+ //delivery adapter methods:
+ DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
+
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
@@ -92,21 +109,32 @@ class SessionState : public framing::SessionState,
// Normally SessionManager creates sessions.
SessionState(SessionManager*,
- SessionContext* out,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
private:
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+
SessionManager* factory;
- SessionContext* handler;
+ SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
framing::ProtocolVersion version;
sys::Mutex lock;
- boost::scoped_ptr<SemanticHandler> semanticHandler;
+
+ SemanticState semanticState;
+ BrokerAdapter adapter;
+ MessageBuilder msgBuilder;
+
+ //execution state
+ IncomingExecutionContext incoming;
+ framing::Window outgoing;
+ RangedOperation ackOp;
+
management::Session::shared_ptr mgmtObject;
friend class SessionManager;
diff --git a/qpid/cpp/src/qpid/client/Session.h b/qpid/cpp/src/qpid/client/Session.h
index 3293af60fe..5d91f289e2 100644
--- a/qpid/cpp/src/qpid/client/Session.h
+++ b/qpid/cpp/src/qpid/client/Session.h
@@ -27,7 +27,7 @@ namespace qpid {
namespace client {
/**
- * Session is currently just an alias for Session_0_10
+ * Session is currently just an alias for Session_99_0
*
* \ingroup clientapi
*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index bca6c49c13..5152aa2e43 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,7 @@
*/
#include "Cluster.h"
-#include "qpid/broker/SessionState.h"
+#include "qpid/broker/PreviewSessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::PreviewSessionState;
namespace {
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ PreviewSessionState& session;
Cluster& cluster;
bool busy;
Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+ ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
void handle(AMQFrame& f) {
Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) {
c.next = h;
}
-struct SessionObserver : public broker::SessionManager::Observer {
+struct SessionObserver : public broker::PreviewSessionManager::Observer {
Cluster& cluster;
SessionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void opened(PreviewSessionState& s) {
// FIXME aconway 2008-01-29: IList for memory management.
ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b62b2be5f1..733db8003d 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- intrusive_ptr<broker::SessionManager::Observer> observer;
+ intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index ceafa389b0..0ea3953175 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin {
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- broker->getSessionManager().add(cluster->getObserver());
+ broker->getPreviewSessionManager().add(cluster->getObserver());
}
}
};
diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
index 1be8856c13..b15e14d6f6 100644
--- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
+++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace framing {
static ProtocolVersion highestProtocolVersion(99, 0);
+//static ProtocolVersion highestProtocolVersion(0, 10);
} /* namespace framing */
} /* namespace qpid */
diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h
index b6ac897e96..86b99a83b0 100644
--- a/qpid/cpp/src/qpid/framing/Proxy.h
+++ b/qpid/cpp/src/qpid/framing/Proxy.h
@@ -39,6 +39,7 @@ class Proxy
void send(const AMQBody&);
ProtocolVersion getVersion() const;
+ FrameHandler& getHandler() { return out; }
protected:
FrameHandler& out;
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
index 700aeef47c..715cdaec2a 100644
--- a/qpid/cpp/src/tests/exception_test.cpp
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) {
BOOST_CHECK_THROW(session.close(), InternalErrorException);
}
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) {
BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
}
diff --git a/qpid/cpp/src/tests/serialize.cpp b/qpid/cpp/src/tests/serialize.cpp
index 72a92ee226..a120be6458 100644
--- a/qpid/cpp/src/tests/serialize.cpp
+++ b/qpid/cpp/src/tests/serialize.cpp
@@ -79,7 +79,7 @@ template <class A, class B, class C, class D> struct concat4 { typedef typename
typedef mpl::vector<Bit, Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes;
typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes;
typedef mpl::vector<Double, Float>::type FloatTypes;
-typedef mpl::vector<SequenceNo, Uuid, DateTime, Dec32, Dec64> FixedSizeClassTypes;
+typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes;
typedef mpl::vector<Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes;
@@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(testNetworkByteOrder) {
void testValue(bool& b) { b = true; }
template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; }
void testValue(long long& l) { l = 12345; }
-void testValue(DateTime& dt) { dt = qpid::sys::now(); }
+void testValue(Datetime& dt) { dt = qpid::sys::now(); }
void testValue(Uuid& uuid) { uuid=Uuid(true); }
template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=1234; }
void testValue(SequenceNo& s) { s = 42; }
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index bbdb501b80..d1e3293a3e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -20,7 +20,7 @@
-
-->
-<amqp major="0" minor="10" port="5672">
+<amqp major="99" minor="0" port="5672">
<class name = "cluster" index = "201">
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 0e36a09bbd..e02b3d6643 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import java.net.URISyntaxException;
+
import javax.jms.Destination;
import javax.naming.NamingException;
import javax.naming.Reference;
@@ -31,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
public abstract class AMQDestination implements Destination, Referenceable
@@ -50,6 +51,8 @@ public abstract class AMQDestination implements Destination, Referenceable
private AMQShortString _routingKey;
+ private AMQShortString[] _bindingKeys;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -64,7 +67,7 @@ public abstract class AMQDestination implements Destination, Referenceable
public static final Integer TOPIC_TYPE = Integer.valueOf(2);
public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
- protected AMQDestination(String url) throws URLSyntaxException
+ protected AMQDestination(String url) throws URISyntaxException
{
this(new AMQBindingURL(url));
}
@@ -79,26 +82,43 @@ public abstract class AMQDestination implements Destination, Referenceable
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName());
_routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey());
+ _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, routingKey, false, false, queueName);
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName, null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys)
+ {
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName)
{
- this(exchangeName, exchangeClass, destinationName, false, false, null);
+ this(exchangeName, exchangeClass, destinationName, false, false, null,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName)
+ boolean isAutoDelete, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false);
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
- boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable){
+ this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
{
// If used with a fannout exchange, the routing key can be null
if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null)
@@ -120,6 +140,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isAutoDelete = isAutoDelete;
_queueName = queueName;
_isDurable = isDurable;
+ _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
}
public AMQShortString getEncodedName()
@@ -181,6 +202,20 @@ public abstract class AMQDestination implements Destination, Referenceable
return _routingKey;
}
+ public AMQShortString[] getBindingKeys()
+ {
+ if (_bindingKeys != null && _bindingKeys.length > 0)
+ {
+ return _bindingKeys;
+ }
+ else
+ {
+ // catering to the common use case where the
+ //routingKey is the same as the bindingKey.
+ return new AMQShortString[]{_routingKey};
+ }
+ }
+
public boolean isExclusive()
{
return _isExclusive;
@@ -239,6 +274,21 @@ public abstract class AMQDestination implements Destination, Referenceable
sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
}
+ // We can't allow both routingKey and bindingKey
+ if (_routingKey == null && _bindingKeys != null && _bindingKeys.length>0)
+ {
+
+ for (AMQShortString bindingKey:_bindingKeys)
+ {
+ sb.append(BindingURL.OPTION_BINDING_KEY);
+ sb.append("='");
+ sb.append(bindingKey);
+ sb.append("'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+
+ }
+ }
+
if (_isDurable)
{
sb.append(BindingURL.OPTION_DURABLE);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
index 924b20e3ba..78b01add14 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
@@ -61,8 +61,14 @@ public class AMQQueue extends AMQDestination implements Queue
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName)
{
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
- false, queueName, false); }
+ false, queueName, false);
+ }
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false,
+ false, queueName, false,bindingKeys);
+ }
/**
* Create a reference to a non temporary queue. Note this does not actually imply the queue exists.
@@ -126,11 +132,15 @@ public class AMQQueue extends AMQDestination implements Queue
this(exchangeName, routingKey, queueName, exclusive, autoDelete, false);
}
-
public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable)
{
+ this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null);
+ }
+
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys)
+ {
super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive,
- autoDelete, queueName, durable);
+ autoDelete, queueName, durable, bindingKeys);
}
public AMQShortString getRoutingKey()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ac5055ccd0..95d9b45b10 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.io.Serializable;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
@@ -86,7 +87,6 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -564,19 +564,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException
+ final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments);
-
- AMQFrame queueBind = body.generateFrame(_channelId);
-
- getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
-
+ sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
return null;
}
}, _connection).execute();
@@ -587,12 +582,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
if( consumer.getQueuename() != null)
{
- bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName());
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
}
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException;
+ final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
/**
@@ -1036,7 +1031,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
return new AMQQueue(new AMQBindingURL(queueName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1253,7 +1248,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
return new AMQTopic(new AMQBindingURL(topicName));
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
JMSException jmse = new JMSException(urlse.getReason());
jmse.setLinkedException(urlse);
@@ -1380,6 +1375,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
+ public void declareAndBind(AMQDestination amqd)
+ throws
+ AMQException
+ {
+ AMQProtocolHandler protocolHandler = getProtocolHandler();
+ declareExchange(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, protocolHandler);
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+ }
+
/**
* Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
*
@@ -1820,35 +1825,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
- {
- try
- {
- AMQMethodEvent response =
- new FailoverRetrySupport<AMQMethodEvent, AMQException>(
- new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
- {
- public AMQMethodEvent execute() throws AMQException, FailoverException
- {
- ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName);
- AMQFrame boundFrame = body.generateFrame(_channelId);
-
- return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
-
- }
- }, _connection).execute();
-
- // Extract and return the response code from the query.
- ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-
- return (responseBody.getReplyCode() == 0);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
- }
- }
+ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException;
+
+ public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -2509,16 +2489,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public void declareAndBind(AMQDestination amqd)
- throws
- AMQException
- {
- AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
- AMQShortString queueName = declareQueue(amqd, protocolHandler);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
- }
-
/**
* Callers must hold the failover mutex before calling this method.
*
@@ -2540,7 +2510,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
consumer.setQueuename(queueName);
// bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 407f1f3786..8039e3a163 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
@@ -159,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
-
+
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -213,7 +214,7 @@ public class AMQSession_0_10 extends AMQSession
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName)
+ final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -222,7 +223,12 @@ public class AMQSession_0_10 extends AMQSession
{
args.put("x-match", "any");
}
- getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args);
+
+ for (AMQShortString rk: destination.getBindingKeys())
+ {
+ _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
+ getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
+ }
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -238,6 +244,7 @@ public class AMQSession_0_10 extends AMQSession
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ getQpidSession().sync();
getQpidSession().sessionClose();
getCurrentException();
}
@@ -350,19 +357,37 @@ public class AMQSession_0_10 extends AMQSession
/**
* Bind a queue with an exchange.
*/
- public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName,
- final AMQShortString routingKey) throws JMSException
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+ throws JMSException
+ {
+ return isQueueBound(exchangeName,queueName,routingKey,null);
+ }
+
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
+ }
+
+ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
+ throws JMSException
{
String rk = "";
boolean res;
- if (routingKey != null)
+ if (bindingKeys != null && bindingKeys.length>0)
+ {
+ rk = bindingKeys[0].toString();
+ }
+ else if (routingKey != null)
{
rk = routingKey.toString();
}
+
Future<BindingQueryResult> result =
- getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null);
+ getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null);
BindingQueryResult bindingQueryResult = result.get();
- if (routingKey == null)
+
+ if (rk == null)
{
res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
}
@@ -577,7 +602,7 @@ public class AMQSession_0_10 extends AMQSession
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
- sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName());
+ sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
catch (Exception e)
@@ -701,7 +726,7 @@ public class AMQSession_0_10 extends AMQSession
AMQShortString topicName;
if (topic instanceof AMQTopic)
{
- topicName=((AMQTopic) topic).getRoutingKey();
+ topicName=((AMQTopic) topic).getBindingKeys()[0];
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 59129bd28c..0450cffcaf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -94,7 +94,7 @@ public class AMQSession_0_8 extends AMQSession
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -171,6 +171,11 @@ public class AMQSession_0_8 extends AMQSession
}
}
+ public boolean isQueueBound(final AMQDestination destination) throws JMSException
+ {
+ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
+ }
+
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index e571b5437e..40041afdc6 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -49,6 +49,11 @@ public class AMQTopic extends AMQDestination implements Topic
super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false);
}
+ public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys)
+ {
+ super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys);
+ }
+
public AMQTopic(AMQConnection conn, String routingKey)
{
this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey));
@@ -77,6 +82,11 @@ public class AMQTopic extends AMQDestination implements Topic
super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
+ }
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 74e466acc4..25494428ea 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- private final String _messageSelector;
+ protected final String _messageSelector;
private final boolean _noLocal;
@@ -740,6 +740,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
else
{
+ // we should not be allowed to add a message is the
+ // consumer is closed
_synchronousQueue.put(jmsMessage);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ef0f219092..74aac53cda 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -38,7 +38,6 @@ import javax.jms.JMSException;
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.Iterator;
/**
@@ -122,8 +121,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
catch (AMQException e)
{
+ _logger.error("Receivecd an Exception when receiving message",e);
try
{
+
getSession().getAMQConnection().getExceptionListener()
.onException(new JMSAMQException("Error when receiving message", e));
}
@@ -135,6 +136,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
if (messageOk)
{
+ _logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
}
@@ -290,7 +292,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (getMessageSelector() != null && !getMessageSelector().equals(""))
+ if (_messageSelector != null && !_messageSelector.equals(""))
{
messageOk = _filter.matches(message);
}
@@ -332,6 +334,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_logger.debug("filterMessage - trying to acquire message");
}
messageOk = acquireMessage(message);
+ _logger.debug("filterMessage - *************************************");
+ _logger.debug("filterMessage - message acquire status : " + messageOk);
+ _logger.debug("filterMessage - *************************************");
}
return messageOk;
}
@@ -393,13 +398,29 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_0_10session.getQpidSession()
.messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+
+ _logger.debug("acquireMessage, sent acquire message to broker");
+
_0_10session.getQpidSession().sync();
+
+ _logger.debug("acquireMessage, returned from sync");
+
RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages();
+
+ _logger.debug("acquireMessage, acquired range set " + acquired);
+
if (acquired != null && acquired.size() > 0)
{
result = true;
}
+
+ _logger.debug("acquireMessage, Trying to get current exception ");
+
_0_10session.getCurrentException();
+
+ _logger.debug("acquireMessage, returned from getting current exception ");
+
+ _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " );
}
return result;
}
@@ -473,4 +494,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
_session.rejectMessage(message, true);
}
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 42c1d687cb..c6baf0b0fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -17,22 +17,22 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.transport.DeliveryProperties;
-
-import javax.jms.Message;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.ReplyTo;
/**
* This is a 0_10 message producer.
@@ -154,12 +154,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
+ if(_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("\n==========================");
+ b.append("\nReplyTo : " + replyToURL);
+ b.append("\n==========================");
+ _logger.debug(b.toString());
+ }
AMQBindingURL dest;
try
{
dest = new AMQBindingURL(replyToURL);
}
- catch (URLSyntaxException e)
+ catch (URISyntaxException e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
@@ -198,8 +206,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
- destination.getRoutingKey());
+ return _session.isQueueBound(destination);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index abca931acc..5baf058668 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -20,30 +20,34 @@
*/
package org.apache.qpid.client.message;
-import org.apache.commons.collections.map.ReferenceMap;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.UUID;
-import org.apache.mina.common.ByteBuffer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.UUID;
-import java.io.IOException;
-import java.net.URISyntaxException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 4eeb702703..bf2a2fdd73 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -140,8 +140,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
- message.receivedFromServer();
+ AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
return message;
}
@@ -152,7 +151,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
{
final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
-
+ msg.receivedFromServer();
return msg;
}
@@ -164,7 +163,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
final AbstractJMSMessage msg =
create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
msg.setJMSRedelivered(redelivered);
-
+ msg.receivedFromServer();
return msg;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index b3f46ab0f6..43ac56dee9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -20,6 +20,24 @@
*/
package org.apache.qpid.jndi;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
+
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQHeadersExchange;
@@ -30,28 +48,9 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class);
@@ -184,6 +183,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
Topic t = createTopic(entry.getValue().toString());
if (t != null)
{
+ if (_logger.isDebugEnabled())
+ {
+ StringBuffer b = new StringBuffer();
+ b.append("Creating the topic: " + jndiName + " with the following binding keys ");
+ for (AMQShortString binding:((AMQTopic)t).getBindingKeys())
+ {
+ b.append(binding.toString()).append(",");
+ }
+
+ _logger.debug(b.toString());
+ }
data.put(jndiName, t);
}
}
@@ -219,7 +229,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
}
catch (URISyntaxException urlse)
{
- _logger.warn("Unable to destination:" + urlse);
+ _logger.warn("Unable to create destination:" + urlse, urlse);
return null;
}
@@ -268,7 +278,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
}
else if (value instanceof String)
{
- return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value));
+ String[] keys = ((String)value).split(",");
+ AMQShortString[] bindings = new AMQShortString[keys.length];
+ int i = 0;
+ for (String key:keys)
+ {
+ bindings[i] = new AMQShortString(key);
+ i++;
+ }
+ // The Destination has a dual nature. If this was used for a producer the key is used
+ // for the routing key. If it was used for the consumer it becomes the bindingKey
+ return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings);
}
else if (value instanceof BindingURL)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java
new file mode 100644
index 0000000000..83d491baad
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpidity.nclient.impl;
+
+/**
+ * This class holds all the 0.10 client constants which value can be set
+ * through properties.
+ */
+public class Constants
+{
+ static
+ {
+
+ String max="message_size_before_sync";// KB's
+ try
+ {
+ MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000"));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ MAX_NOT_SYNC_DATA_LENGH=200000000;
+ }
+ String flush="message_size_before_flush";
+ try
+ {
+ MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000"));
+ }
+ catch (NumberFormatException e)
+ {
+ // use default size
+ MAX_NOT_FLUSH_DATA_LENGH=20000000;
+ }
+ }
+
+ /**
+ * The total message size in KBs that can be transferted before
+ * client and broker are synchronized.
+ * A sync will result in the client library releasing the sent messages
+ * from memory. (messages are kept
+ * in memory so client can reconnect to a broker in the event of a failure)
+ * <p>
+ * Property name: message_size_before_sync
+ * <p>
+ * Default value: 200000000
+ */
+ public static long MAX_NOT_SYNC_DATA_LENGH;
+ /**
+ * The total message size in KBs that can be transferted before
+ * messages are flushed.
+ * When a flush returns all messages have reached the broker.
+ * <p>
+ * Property name: message_size_before_flush
+ * <p>
+ * Default value: 200000000
+ */
+ public static long MAX_NOT_FLUSH_DATA_LENGH;
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
index 66be1ebc73..88dd212ab6 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,12 +23,18 @@ package org.apache.qpid.test.unit.client.destinationurl;
import junit.framework.TestCase;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.unit.basic.PropertyValueTest;
import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
public class DestinationURLTest extends TestCase
{
- public void testFullURL() throws URLSyntaxException
+ private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class);
+
+ public void testFullURL() throws URISyntaxException
{
String url = "exchange.Class://exchangeName/Destination/Queue";
@@ -43,7 +49,7 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals("Queue"));
}
- public void testQueue() throws URLSyntaxException
+ public void testQueue() throws URISyntaxException
{
String url = "exchangeClass://exchangeName//Queue";
@@ -58,7 +64,7 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals("Queue"));
}
- public void testQueueWithOption() throws URLSyntaxException
+ public void testQueueWithOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName//Queue?option='value'";
@@ -75,7 +81,7 @@ public class DestinationURLTest extends TestCase
}
- public void testDestination() throws URLSyntaxException
+ public void testDestination() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/";
@@ -90,7 +96,7 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getQueueName().equals(""));
}
- public void testDestinationWithOption() throws URLSyntaxException
+ public void testDestinationWithOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/?option='value'";
@@ -107,7 +113,7 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getOption("option").equals("value"));
}
- public void testDestinationWithMultiOption() throws URLSyntaxException
+ public void testDestinationWithMultiOption() throws URISyntaxException
{
String url = "exchangeClass://exchangeName/Destination/?option='value',option2='value2'";
@@ -123,7 +129,7 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getOption("option2").equals("value2"));
}
- public void testDestinationWithNoExchangeDefaultsToDirect() throws URLSyntaxException
+ public void testDestinationWithNoExchangeDefaultsToDirect() throws URISyntaxException
{
String url = "IBMPerfQueue1?durable='true'";
@@ -138,6 +144,41 @@ public class DestinationURLTest extends TestCase
assertTrue(dest.getOption("durable").equals("true"));
}
+ public void testDestinationWithMultiBindingKeys() throws URISyntaxException
+ {
+
+ String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'";
+
+ AMQBindingURL dest = new AMQBindingURL(url);
+
+ assertTrue(dest.getExchangeClass().equals("exchangeClass"));
+ assertTrue(dest.getExchangeName().equals("exchangeName"));
+ assertTrue(dest.getDestinationName().equals("Destination"));
+ assertTrue(dest.getQueueName().equals(""));
+
+ assertTrue(dest.getBindingKeys().length == 2);
+ }
+
+ // You can only specify only a routing key or binding key, but not both.
+ public void testDestinationIfOnlyRoutingKeyOrBindingKeyIsSpecified() throws URISyntaxException
+ {
+
+ String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',routingkey='key2'";
+ boolean exceptionThrown = false;
+ try
+ {
+
+ AMQBindingURL dest = new AMQBindingURL(url);
+ }
+ catch(URISyntaxException e)
+ {
+ exceptionThrown = true;
+ _logger.info("Exception thrown",e);
+ }
+
+ assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown);
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(DestinationURLTest.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
index 529a05b2e2..998242925c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
@@ -20,28 +20,29 @@
*/
package org.apache.qpid.url;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-
public class AMQBindingURL implements BindingURL
{
private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class);
String _url;
- AMQShortString _exchangeClass;
- AMQShortString _exchangeName;
- AMQShortString _destinationName;
- AMQShortString _queueName;
+ AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ AMQShortString _exchangeName = new AMQShortString("");
+ AMQShortString _destinationName = new AMQShortString("");;
+ AMQShortString _queueName = new AMQShortString("");
+ AMQShortString[] _bindingKeys = new AMQShortString[0];
private HashMap<String, String> _options;
- public AMQBindingURL(String url) throws URLSyntaxException
+ public AMQBindingURL(String url) throws URISyntaxException
{
// format:
// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
@@ -52,116 +53,35 @@ public class AMQBindingURL implements BindingURL
parseBindingURL();
}
- private void parseBindingURL() throws URLSyntaxException
+ private void parseBindingURL() throws URISyntaxException
{
- try
- {
- URI connection = new URI(_url);
-
- String exchangeClass = connection.getScheme();
-
- if (exchangeClass == null)
- {
- _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url;
- // URLHelper.parseError(-1, "Exchange Class not specified.", _url);
- parseBindingURL();
-
- return;
- }
- else
- {
- setExchangeClass(exchangeClass);
- }
-
- String exchangeName = connection.getHost();
-
- if (exchangeName == null)
- {
- if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
- {
- setExchangeName("");
- }
- else
- {
- throw URLHelper.parseError(-1, "Exchange Name not specified.", _url);
- }
- }
- else
- {
- setExchangeName(exchangeName);
- }
-
- String queueName;
-
- if ((connection.getPath() == null) || connection.getPath().equals(""))
- {
- throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
- "Destination or Queue requried", _url);
- }
- else
- {
- int slash = connection.getPath().indexOf("/", 1);
- if (slash == -1)
- {
- throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(),
- "Destination requried", _url);
- }
- else
- {
- String path = connection.getPath();
- setDestinationName(path.substring(1, slash));
-
- // We don't set queueName yet as the actual value we use depends on options set
- // when we are dealing with durable subscriptions
-
- queueName = path.substring(slash + 1);
-
- }
- }
-
- URLHelper.parseOptions(_options, connection.getQuery());
-
- processOptions();
-
- // We can now call setQueueName as the URL is full parsed.
-
- setQueueName(queueName);
-
- // Fragment is #string (not used)
- _logger.debug("URL Parsed: " + this);
-
- }
- catch (URISyntaxException uris)
- {
-
- throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
-
- }
+ BindingURLParser parser = new BindingURLParser(_url,this);
+ processOptions();
+ _logger.debug("URL Parsed: " + this);
}
- private void setExchangeClass(String exchangeClass)
+ public void setExchangeClass(String exchangeClass)
{
setExchangeClass(new AMQShortString(exchangeClass));
}
- private void setQueueName(String name) throws URLSyntaxException
+ public void setQueueName(String name)
{
setQueueName(new AMQShortString(name));
}
- private void setDestinationName(String name)
+ public void setDestinationName(String name)
{
setDestinationName(new AMQShortString(name));
}
- private void setExchangeName(String exchangeName)
+ public void setExchangeName(String exchangeName)
{
setExchangeName(new AMQShortString(exchangeName));
}
- private void processOptions()
+ private void processOptions() throws URISyntaxException
{
- // this is where we would parse any options that needed more than just storage.
}
public String getURL()
@@ -210,34 +130,9 @@ public class AMQBindingURL implements BindingURL
return _queueName;
}
- public void setQueueName(AMQShortString name) throws URLSyntaxException
+ public void setQueueName(AMQShortString name)
{
- if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
- {
- if (Boolean.parseBoolean(getOption(OPTION_DURABLE)))
- {
- if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION))
- {
- _queueName =
- new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION));
- }
- else
- {
- throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID
- + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url);
-
- }
- }
- else
- {
- _queueName = null;
- }
- }
- else
- {
- _queueName = name;
- }
-
+ _queueName = name;
}
public String getOption(String key)
@@ -261,7 +156,7 @@ public class AMQBindingURL implements BindingURL
{
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+ return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
}
else
{
@@ -271,12 +166,29 @@ public class AMQBindingURL implements BindingURL
if (containsOption(BindingURL.OPTION_ROUTING_KEY))
{
- return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+ return new AMQShortString((String)getOption(OPTION_ROUTING_KEY));
}
return getDestinationName();
}
+ public AMQShortString[] getBindingKeys()
+ {
+ if (_bindingKeys != null && _bindingKeys.length>0)
+ {
+ return _bindingKeys;
+ }
+ else
+ {
+ return new AMQShortString[]{getRoutingKey()};
+ }
+ }
+
+ public void setBindingKeys(AMQShortString[] keys)
+ {
+ _bindingKeys = keys;
+ }
+
public void setRoutingKey(AMQShortString key)
{
setOption(OPTION_ROUTING_KEY, key.toString());
@@ -296,6 +208,29 @@ public class AMQBindingURL implements BindingURL
sb.append(URLHelper.printOptions(_options));
- return sb.toString();
+ // temp hack
+ if (getRoutingKey() == null || getRoutingKey().toString().equals(""))
+ {
+
+ if (sb.toString().indexOf("?") == -1)
+ {
+ sb.append("?");
+ }
+ else
+ {
+ sb.append("&");
+ }
+
+ for (AMQShortString key :_bindingKeys)
+ {
+ sb.append(BindingURL.OPTION_BINDING_KEY).append("='").append(key.toString()).append("'&");
+ }
+
+ return sb.toString().substring(0,sb.toString().length()-1);
+ }
+ else
+ {
+ return sb.toString();
+ }
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
index 67be2db86f..25450fea64 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,6 +34,7 @@ public interface BindingURL
public static final String OPTION_CLIENTID = "clientid";
public static final String OPTION_SUBSCRIPTION = "subscription";
public static final String OPTION_ROUTING_KEY = "routingkey";
+ public static final String OPTION_BINDING_KEY = "bindingkey";
String getURL();
@@ -52,5 +53,7 @@ public interface BindingURL
AMQShortString getRoutingKey();
+ AMQShortString[] getBindingKeys();
+
String toString();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java
new file mode 100644
index 0000000000..5d26e7e65b
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java
@@ -0,0 +1,445 @@
+package org.apache.qpid.url;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BindingURLParser
+{
+ private static final char PROPERTY_EQUALS_CHAR = '=';
+ private static final char PROPERTY_SEPARATOR_CHAR = '&';
+ private static final char ALTERNATIVE_PROPERTY_SEPARATOR_CHAR = ',';
+ private static final char FORWARD_SLASH_CHAR = '/';
+ private static final char QUESTION_MARK_CHAR = '?';
+ private static final char SINGLE_QUOTE_CHAR = '\'';
+ private static final char COLON_CHAR = ':';
+ private static final char END_OF_URL_MARKER_CHAR = '%';
+
+ private static final Logger _logger = LoggerFactory.getLogger(BindingURLImpl.class);
+
+ private char[] _url;
+ private AMQBindingURL _bindingURL;
+ private BindingURLParserState _currentParserState;
+ private String _error;
+ private int _index = 0;
+ private String _currentPropName;
+ private Map<String,Object> _options = new HashMap<String,Object>();
+
+ //<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ public BindingURLParser(String url,AMQBindingURL bindingURL) throws URISyntaxException
+ {
+ _url = (url + END_OF_URL_MARKER_CHAR).toCharArray();
+ _bindingURL = bindingURL;
+ _currentParserState = BindingURLParserState.BINDING_URL_START;
+ BindingURLParserState prevState = _currentParserState;
+
+ try
+ {
+ while (_currentParserState != BindingURLParserState.ERROR && _currentParserState != BindingURLParserState.BINDING_URL_END)
+ {
+ prevState = _currentParserState;
+ _currentParserState = next();
+ }
+
+ if (_currentParserState == BindingURLParserState.ERROR)
+ {
+ _error =
+ "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ") due to " + _error;
+ _logger.debug(_error);
+ URISyntaxException ex;
+ ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index);
+ throw ex;
+ }
+
+ processOptions();
+ }
+ catch (ArrayIndexOutOfBoundsException e)
+ {
+ _error = "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ")";
+ URISyntaxException ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index);
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
+ enum BindingURLParserState
+ {
+ BINDING_URL_START,
+ EXCHANGE_CLASS,
+ COLON_CHAR,
+ DOUBLE_SEP,
+ EXCHANGE_NAME,
+ EXCHANGE_SEPERATOR_CHAR,
+ DESTINATION,
+ DESTINATION_SEPERATOR_CHAR,
+ QUEUE_NAME,
+ QUESTION_MARK_CHAR,
+ PROPERTY_NAME,
+ PROPERTY_EQUALS,
+ START_PROPERTY_VALUE,
+ PROPERTY_VALUE,
+ END_PROPERTY_VALUE,
+ PROPERTY_SEPARATOR,
+ BINDING_URL_END,
+ ERROR
+ }
+
+ /**
+ * I am fully ware that there are few optimizations
+ * that can speed up things a wee bit. But I have opted
+ * for readability and maintainability at the expense of
+ * speed, as speed is not a critical factor here.
+ *
+ * One can understand the full parse logic by just looking at this method.
+ */
+ private BindingURLParserState next()
+ {
+ switch (_currentParserState)
+ {
+ case BINDING_URL_START:
+ return extractExchangeClass();
+ case COLON_CHAR:
+ _index++; //skip ":"
+ return BindingURLParserState.DOUBLE_SEP;
+ case DOUBLE_SEP:
+ _index = _index + 2; //skip "//"
+ return BindingURLParserState.EXCHANGE_NAME;
+ case EXCHANGE_NAME:
+ return extractExchangeName();
+ case EXCHANGE_SEPERATOR_CHAR:
+ _index++; // skip '/'
+ return BindingURLParserState.DESTINATION;
+ case DESTINATION:
+ return extractDestination();
+ case DESTINATION_SEPERATOR_CHAR:
+ _index++; // skip '/'
+ return BindingURLParserState.QUEUE_NAME;
+ case QUEUE_NAME:
+ return extractQueueName();
+ case QUESTION_MARK_CHAR:
+ _index++; // skip '?'
+ return BindingURLParserState.PROPERTY_NAME;
+ case PROPERTY_NAME:
+ return extractPropertyName();
+ case PROPERTY_EQUALS:
+ _index++; // skip the equal sign
+ return BindingURLParserState.START_PROPERTY_VALUE;
+ case START_PROPERTY_VALUE:
+ _index++; // skip the '\''
+ return BindingURLParserState.PROPERTY_VALUE;
+ case PROPERTY_VALUE:
+ return extractPropertyValue();
+ case END_PROPERTY_VALUE:
+ _index ++;
+ return checkEndOfURL();
+ case PROPERTY_SEPARATOR:
+ _index++; // skip '&'
+ return BindingURLParserState.PROPERTY_NAME;
+ default:
+ return BindingURLParserState.ERROR;
+ }
+ }
+
+ private BindingURLParserState extractExchangeClass()
+ {
+ char nextChar = _url[_index];
+
+ // check for the following special cases.
+ // "myQueue?durable='true'" or just "myQueue";
+
+ StringBuilder builder = new StringBuilder();
+ while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR)
+ {
+ builder.append(nextChar);
+ _index++;
+ nextChar = _url[_index];
+ }
+
+ // normal use case
+ if (nextChar == COLON_CHAR)
+ {
+ _bindingURL.setExchangeClass(builder.toString());
+ return BindingURLParserState.COLON_CHAR;
+ }
+ // "myQueue?durable='true'" use case
+ else if (nextChar == QUESTION_MARK_CHAR)
+ {
+ _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString());
+ _bindingURL.setExchangeName("");
+ _bindingURL.setQueueName(builder.toString());
+ return BindingURLParserState.QUESTION_MARK_CHAR;
+ }
+ else
+ {
+ _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString());
+ _bindingURL.setExchangeName("");
+ _bindingURL.setQueueName(builder.toString());
+ return BindingURLParserState.BINDING_URL_END;
+ }
+ }
+
+ private BindingURLParserState extractExchangeName()
+ {
+ char nextChar = _url[_index];
+ StringBuilder builder = new StringBuilder();
+ while (nextChar != FORWARD_SLASH_CHAR)
+ {
+ builder.append(nextChar);
+ _index++;
+ nextChar = _url[_index];
+ }
+
+ _bindingURL.setExchangeName(builder.toString());
+ return BindingURLParserState.EXCHANGE_SEPERATOR_CHAR;
+ }
+
+ private BindingURLParserState extractDestination()
+ {
+ char nextChar = _url[_index];
+
+ //The destination is and queue name are both optional
+ // This is checking for the case where both are not specified.
+ if (nextChar == QUESTION_MARK_CHAR)
+ {
+ return BindingURLParserState.QUESTION_MARK_CHAR;
+ }
+
+ StringBuilder builder = new StringBuilder();
+ while (nextChar != FORWARD_SLASH_CHAR && nextChar != QUESTION_MARK_CHAR)
+ {
+ builder.append(nextChar);
+ _index++;
+ nextChar = _url[_index];
+ }
+
+ // This is the case where the destination is explictily stated.
+ // ex direct://amq.direct/myDest/myQueue?option1='1' ... OR
+ // direct://amq.direct//myQueue?option1='1' ...
+ if (nextChar == FORWARD_SLASH_CHAR)
+ {
+ _bindingURL.setDestinationName(builder.toString());
+ return BindingURLParserState.DESTINATION_SEPERATOR_CHAR;
+ }
+ // This is the case where destination is not explictly stated.
+ // ex direct://amq.direct/myQueue?option1='1' ...
+ else
+ {
+ _bindingURL.setQueueName(builder.toString());
+ return BindingURLParserState.QUESTION_MARK_CHAR;
+ }
+ }
+
+ private BindingURLParserState extractQueueName()
+ {
+ char nextChar = _url[_index];
+ StringBuilder builder = new StringBuilder();
+ while (nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR)
+ {
+ builder.append(nextChar);
+ nextChar = _url[++_index];
+ }
+ _bindingURL.setQueueName(builder.toString());
+
+ if(nextChar == QUESTION_MARK_CHAR)
+ {
+ return BindingURLParserState.QUESTION_MARK_CHAR;
+ }
+ else
+ {
+ return BindingURLParserState.BINDING_URL_END;
+ }
+ }
+
+ private BindingURLParserState extractPropertyName()
+ {
+ StringBuilder builder = new StringBuilder();
+ char next = _url[_index];
+ while (next != PROPERTY_EQUALS_CHAR)
+ {
+ builder.append(next);
+ next = _url[++_index];
+ }
+ _currentPropName = builder.toString();
+
+ if (_currentPropName.trim().equals(""))
+ {
+ _error = "Property name cannot be empty";
+ return BindingURLParserState.ERROR;
+ }
+
+ return BindingURLParserState.PROPERTY_EQUALS;
+ }
+
+ private BindingURLParserState extractPropertyValue()
+ {
+ StringBuilder builder = new StringBuilder();
+ char next = _url[_index];
+ while (next != SINGLE_QUOTE_CHAR)
+ {
+ builder.append(next);
+ next = _url[++_index];
+ }
+ String propValue = builder.toString();
+
+ if (propValue.trim().equals(""))
+ {
+ _error = "Property values cannot be empty";
+ return BindingURLParserState.ERROR;
+ }
+ else
+ {
+ if (_options.containsKey(_currentPropName))
+ {
+ Object obj = _options.get(_currentPropName);
+ if (obj instanceof List)
+ {
+ List list = (List)obj;
+ list.add(propValue);
+ }
+ else // it has to be a string
+ {
+ List<String> list = new ArrayList();
+ list.add((String)obj);
+ list.add(propValue);
+ _options.put(_currentPropName, list);
+ }
+ }
+ else
+ {
+ _options.put(_currentPropName, propValue);
+ }
+
+
+ return BindingURLParserState.END_PROPERTY_VALUE;
+ }
+ }
+
+ private BindingURLParserState checkEndOfURL()
+ {
+ char nextChar = _url[_index];
+ if ( nextChar == END_OF_URL_MARKER_CHAR)
+ {
+ return BindingURLParserState.BINDING_URL_END;
+ }
+ else if (nextChar == PROPERTY_SEPARATOR_CHAR || nextChar == ALTERNATIVE_PROPERTY_SEPARATOR_CHAR)
+ {
+ return BindingURLParserState.PROPERTY_SEPARATOR;
+ }
+ else
+ {
+ return BindingURLParserState.ERROR;
+ }
+ }
+
+ private String markErrorLocation()
+ {
+ String tmp = String.valueOf(_url);
+ // length -1 to remove ENDOF URL marker
+ return tmp.substring(0,_index) + "^" + tmp.substring(_index+1> tmp.length()-1?tmp.length()-1:_index+1,tmp.length()-1);
+ }
+
+ private void processOptions() throws URISyntaxException
+ {
+// check for bindingKey
+ if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.get(BindingURL.OPTION_BINDING_KEY) != null)
+ {
+ Object obj = _options.get(BindingURL.OPTION_BINDING_KEY);
+
+ if (obj instanceof String)
+ {
+ AMQShortString[] bindingKeys = new AMQShortString[]{new AMQShortString((String)obj)};
+ _bindingURL.setBindingKeys(bindingKeys);
+ }
+ else // it would be a list
+ {
+ List list = (List)obj;
+ AMQShortString[] bindingKeys = new AMQShortString[list.size()];
+ int i=0;
+ for (Iterator it = list.iterator(); it.hasNext();)
+ {
+ bindingKeys[i] = new AMQShortString((String)it.next());
+ i++;
+ }
+ _bindingURL.setBindingKeys(bindingKeys);
+ }
+
+ }
+ for (String key: _options.keySet())
+ {
+ // We want to skip the bindingKey list
+ if (_options.get(key) instanceof String)
+ {
+ _bindingURL.setOption(key, (String)_options.get(key));
+ }
+ }
+
+
+ // check if both a binding key and a routing key is specified.
+ if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.containsKey(BindingURL.OPTION_ROUTING_KEY))
+ {
+ throw new URISyntaxException(String.valueOf(_url),"It is illegal to specify both a routingKey and a bindingKey in the same URL",-1);
+ }
+
+ // check for durable subscriptions
+ if (_bindingURL.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
+ {
+ String queueName = null;
+ if (Boolean.parseBoolean(_bindingURL.getOption(BindingURL.OPTION_DURABLE)))
+ {
+ if (_bindingURL.containsOption(BindingURL.OPTION_CLIENTID) && _bindingURL.containsOption(BindingURL.OPTION_SUBSCRIPTION))
+ {
+ queueName = _bindingURL.getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION);
+ }
+ else
+ {
+ throw new URISyntaxException(String.valueOf(_url),"Durable subscription must have values for " + BindingURL.OPTION_CLIENTID
+ + " and " + BindingURL.OPTION_SUBSCRIPTION , -1);
+
+ }
+ }
+ _bindingURL.setQueueName(queueName);
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ String[] urls = new String[]
+ {
+ "topic://amq.topic//myTopic?routingkey='stocks.#'",
+ "topic://amq.topic/message_queue?bindingkey='usa.*'&bindingkey='control',exclusive='true'",
+ "topic://amq.topic//?bindingKey='usa.*',bindingkey='control',exclusive='true'",
+ "direct://amq.direct/dummyDest/myQueue?routingkey='abc.*'",
+ "exchange.Class://exchangeName/Destination/Queue",
+ "exchangeClass://exchangeName/Destination/?option='value',option2='value2'",
+ "IBMPerfQueue1?durable='true'",
+ "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'",
+ "exchangeClass://exchangeName/Destination/?bindingkey='key1'&routingkey='key2'"
+ };
+
+ try
+ {
+ for (String url: urls)
+ {
+ System.out.println("URL " + url);
+ AMQBindingURL bindingURL = new AMQBindingURL(url);
+ BindingURLParser parser = new BindingURLParser(url,bindingURL);
+ System.out.println("\nX " + bindingURL.toString() + " \n");
+
+ }
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 1da06ab92b..b5f2efdf01 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -158,6 +158,7 @@
</target>
<property name="test" value="*Test"/>
+ <property name="test.mem" value="512M"/>
<property name="log" value="info"/>
<property name="amqj.logging.level" value="${log}"/>
@@ -190,6 +191,8 @@
description="execute unit tests">
<junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000">
+ <jvmarg value="-Xmx${test.mem}" />
+
<sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/>
<sysproperty key="root.logging.level" value="${root.logging.level}"/>
<sysproperty key="log4j.configuration" value="${log4j.configuration}"/>
diff --git a/qpid/python/mllib/dom.py b/qpid/python/mllib/dom.py
index 10b19d6db1..7c759dbdd5 100644
--- a/qpid/python/mllib/dom.py
+++ b/qpid/python/mllib/dom.py
@@ -72,7 +72,7 @@ class Dispatcher:
cls = cls.base
return False
- def dispatch(self, f):
+ def dispatch(self, f, attrs = ""):
cls = self
while cls != None:
if hasattr(f, cls.type):
@@ -81,7 +81,6 @@ class Dispatcher:
cls = cls.base
cls = self
- attrs = ""
while cls != None:
if attrs:
sep = ", "
@@ -151,9 +150,10 @@ class Tag(Node):
def dispatch(self, f):
try:
- method = getattr(f, "do_" + self.name)
+ attr = "do_" + self.name
+ method = getattr(f, attr)
except AttributeError:
- return Dispatcher.dispatch(self, f)
+ return Dispatcher.dispatch(self, f, "'%s'" % attr)
return method(self)
class Leaf(Component, Dispatcher):
diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py
index af0565b6cc..00946a9156 100644
--- a/qpid/python/qpid/queue.py
+++ b/qpid/python/qpid/queue.py
@@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use.
"""
from Queue import Queue as BaseQueue, Empty, Full
+from threading import Thread
class Closed(Exception): pass
class Queue(BaseQueue):
END = object()
+ STOP = object()
def __init__(self, *args, **kwargs):
BaseQueue.__init__(self, *args, **kwargs)
- self._real_put = self.put
- self.listener = self._real_put
+ self.listener = None
+ self.thread = None
def close(self):
self.put(Queue.END)
def get(self, block = True, timeout = None):
- self.put = self._real_put
- try:
- result = BaseQueue.get(self, block, timeout)
- if result == Queue.END:
- # this guarantees that any other waiting threads or any future
- # calls to get will also result in a Closed exception
- self.put(Queue.END)
- raise Closed()
- else:
- return result
- finally:
- self.put = self.listener
- pass
+ result = BaseQueue.get(self, block, timeout)
+ if result == Queue.END:
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Closed exception
+ self.put(Queue.END)
+ raise Closed()
+ else:
+ return result
def listen(self, listener):
self.listener = listener
- self.put = self.listener
+ if listener == None:
+ if self.thread != None:
+ self.put(Queue.STOP)
+ self.thread.join()
+ self.thread = None
+ else:
+ if self.thread == None:
+ self.thread = Thread(target = self.run)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ def run(self):
+ while True:
+ try:
+ o = self.get()
+ if o == Queue.STOP: break
+ self.listener(o)
+ except Closed:
+ break
diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py
index d2e495d207..e12354eb43 100644
--- a/qpid/python/tests/queue.py
+++ b/qpid/python/tests/queue.py
@@ -30,37 +30,32 @@ class QueueTest (TestCase):
# all the queue functionality.
def test_listen(self):
- LISTEN = object()
- GET = object()
- EMPTY = object()
+ values = []
+ heard = threading.Event()
+ def listener(x):
+ values.append(x)
+ heard.set()
q = Queue(0)
- values = []
- q.listen(lambda x: values.append((LISTEN, x)))
+ q.listen(listener)
+ heard.clear()
q.put(1)
- assert values[-1] == (LISTEN, 1)
+ heard.wait()
+ assert values[-1] == 1
+ heard.clear()
q.put(2)
- assert values[-1] == (LISTEN, 2)
-
- class Getter(threading.Thread):
+ heard.wait()
+ assert values[-1] == 2
- def run(self):
- try:
- values.append((GET, q.get(timeout=10)))
- except Empty:
- values.append(EMPTY)
-
- g = Getter()
- g.start()
- # let the other thread reach the get
- time.sleep(2)
+ q.listen(None)
q.put(3)
- g.join()
-
- assert values[-1] == (GET, 3)
+ assert q.get(3) == 3
+ q.listen(listener)
+ heard.clear()
q.put(4)
- assert values[-1] == (LISTEN, 4)
+ heard.wait()
+ assert values[-1] == 4
def test_close(self):
q = Queue(0)