diff options
author | Aidan Skinner <aidan@apache.org> | 2008-02-28 16:11:52 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-02-28 16:11:52 +0000 |
commit | 603bff3fda0e820072b5309af0d15f2a7f049737 (patch) | |
tree | ed61c13f19761f8cebee23a7fd68a81a24bc676d | |
parent | 287d7af8671815d97b9db06d7f93292e6d18d58e (diff) | |
download | qpid-python-603bff3fda0e820072b5309af0d15f2a7f049737.tar.gz |
Merged revisions 630296,630353,630847,630852,630934,631002,631014,631128,631250,631486,631489-631490,631638,631657,631695,631697-631698,631740,631790,631823,631931 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
........
r630296 | aconway | 2008-02-22 19:14:05 +0000 (Fri, 22 Feb 2008) | 2 lines
Provide separate name, message and error code on all Exceptions.
........
r630353 | aconway | 2008-02-22 23:13:43 +0000 (Fri, 22 Feb 2008) | 6 lines
Fixed rubygen to skip unchanged generated files, prevents needless
rebuilding every time the code generator changes.
Start of amqp 0-10 mapping, work in progress.
........
r630847 | arnaudsimon | 2008-02-25 13:45:53 +0000 (Mon, 25 Feb 2008) | 1 line
Changed checkPreConditions for accessing directly to _messageSelector
........
r630852 | arnaudsimon | 2008-02-25 13:50:46 +0000 (Mon, 25 Feb 2008) | 1 line
added prop jvmarg, that could be used allocate more memory: -Djvmarg=-Xmx512m
........
r630934 | gsim | 2008-02-25 16:56:29 +0000 (Mon, 25 Feb 2008) | 3 lines
Some refactoring of the 0-10 codepath (being migrated to final spec) that primarily colocates the current session and execution layers to facilitate implementing the new session layer that will now encompass this behaviour.
........
r631002 | rhs | 2008-02-25 21:29:55 +0000 (Mon, 25 Feb 2008) | 1 line
put queue listeners in their own thread
........
r631014 | aconway | 2008-02-25 21:56:03 +0000 (Mon, 25 Feb 2008) | 2 lines
Fix build problem if openais is installed but openais-devel is not.
........
r631128 | gsim | 2008-02-26 08:40:13 +0000 (Tue, 26 Feb 2008) | 3 lines
Use sessions reference to broker rather than connections in semantic state as the sessions ref is valid even when not attached.
........
r631250 | rhs | 2008-02-26 15:20:35 +0000 (Tue, 26 Feb 2008) | 1 line
jvmarg -> test.mem; this fixes build breakage when jvmarg is empty or unspecified
........
r631486 | rajith | 2008-02-27 05:15:20 +0000 (Wed, 27 Feb 2008) | 2 lines
This contains the ground work for QPID-803.
........
r631489 | rajith | 2008-02-27 05:17:07 +0000 (Wed, 27 Feb 2008) | 5 lines
Added a new parser for the BindingURL.
This allows adding multiple binding keys, using # and * in the URLs.
This is tracked via QPID-814
........
r631490 | rajith | 2008-02-27 05:17:46 +0000 (Wed, 27 Feb 2008) | 1 line
added a test case for the multiple binding key case
........
r631638 | aconway | 2008-02-27 16:37:48 +0000 (Wed, 27 Feb 2008) | 3 lines
Generate code for both 0-99 preview and 0-10 final specs .
0-10 final: extended code generation and non-generated support classes.
........
r631657 | rhs | 2008-02-27 17:18:40 +0000 (Wed, 27 Feb 2008) | 1 line
improved error message
........
r631695 | rajith | 2008-02-27 19:56:58 +0000 (Wed, 27 Feb 2008) | 5 lines
Correct the constant OPTION_BINDING_KEY that resulted in a test case failure
Modified the AMQBindingURL to fix an error in the toString method
Added more test cases to the main method in the BindingURLParser for quick testing, these cases are also present as unit tests.
........
r631697 | rajith | 2008-02-27 19:59:13 +0000 (Wed, 27 Feb 2008) | 2 lines
Added another test to check for URISyntaxException when both routingkey and bindingkey is specified
........
r631698 | rajith | 2008-02-27 20:01:12 +0000 (Wed, 27 Feb 2008) | 3 lines
Fixed the toString method to avoid printing both a routingkey and bindingkey and also to properly printout the bindingkeys when required.
This bug caused 3 test failures.
........
r631740 | aconway | 2008-02-27 21:49:04 +0000 (Wed, 27 Feb 2008) | 4 lines
Generating domains, structs, commands and controls for 0-10 final spec.
Not yet generating: holders, visitors.
........
r631790 | aconway | 2008-02-28 00:32:56 +0000 (Thu, 28 Feb 2008) | 3 lines
Added missing generated files to distribution.
........
r631823 | aconway | 2008-02-28 03:08:42 +0000 (Thu, 28 Feb 2008) | 3 lines
Added missing generated files to RPM.
........
r631931 | arnaudsimon | 2008-02-28 11:17:50 +0000 (Thu, 28 Feb 2008) | 1 line
See Qpid-817
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@632040 13f79535-47bb-0310-9956-ffa450edef68
76 files changed, 2345 insertions, 548 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 3d68ed7890..c17051d431 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -122,11 +122,11 @@ test -n "$RUBY" && generate=yes test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").]) specdir=`pwd`/$srcdir/../specs -AMQP_XML=$specdir/amqp.0-10-preview.xml -AC_SUBST(AMQP_XML) -ls $AMQP_XML >/dev/null 2>&1 || generate=no - -AM_CONDITIONAL([GENERATE], [test x$generate = xyes]) +AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml +AMQP_FINAL_XML=$specdir/amqp.0-10.xml +AC_SUBST(AMQP_PREVIEW_XML) +AC_SUBST(AMQP_FINAL_XML) +AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null]) # URL and download URL for the package. URL=http://rhm.et.redhat.com/qpidc @@ -139,7 +139,6 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],, AC_MSG_ERROR([Missing required header files.])) # Check for optional CPG requirement. -save_ldflags=$LDFLAGS LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" AC_ARG_WITH([cpg], @@ -157,15 +156,13 @@ AC_ARG_WITH([cpg], esac], [ # not specified - enable if libs/headers available. with_CPG=yes - AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no]) + AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) ] ) AM_CONDITIONAL([CPG], [test x$with_CPG = xyes]) if test x$with_CPG = xyes; then CPPFLAGS+=" -DCPG" -else - LDFLAGS=$save_ldflags fi # Files to generate diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 167247f67b..fd4a24b2e5 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -104,6 +104,7 @@ make check %files devel %defattr(-,root,root,-) %_includedir/qpid/*.h +%_includedir/qpid/amqp_0_10 %_includedir/qpid/client %_includedir/qpid/framing %_includedir/qpid/sys diff --git a/qpid/cpp/rubygen/0-10/specification.rb b/qpid/cpp/rubygen/0-10/specification.rb new file mode 100755 index 0000000000..026b49e1a9 --- /dev/null +++ b/qpid/cpp/rubygen/0-10/specification.rb @@ -0,0 +1,177 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class Specification < CppGen + def initialize(outdir, amqp) + super(outdir, amqp) + @ns="qpid::amqp_#{@amqp.version.bars}" + @dir="qpid/amqp_#{@amqp.version.bars}" + end + + # domains + + def domain_h(d) + genl + typename=d.name.typename + if d.enum + scope("enum #{typename} {", "};") { + genl d.enum.choices.map { |c| + "#{c.name.constname} = #{c.value}" }.join(",\n") + } + elsif (d.type_ == "array") + genl "typedef Array<#{ArrayTypes[d.name].amqp2cpp}> #{typename};" + else + genl "typedef #{d.type_.amqp2cpp} #{typename};" + end + end + + # class constants + + def class_h(c) + genl "const uint8_t CODE=#{c.code};" + genl "extern const char* NAME;" + end + + def class_cpp(c) + genl "const char* NAME=\"#{c.fqname}\";" + end + + # Used by structs, commands and controls. + def action_struct_h(x, base, consts, &block) + genl + struct(x.classname, "public #{base}") { + x.fields.each { |f| genl "#{f.type_.amqp2cpp} #{f.cppname};" } + genl + genl "static const char* NAME;" + consts.each { |c| genl "static const uint8_t #{c.upcase}=#{x.send c or 0};"} + genl "static const uint8_t CLASS_CODE=#{x.containing_class.nsname}::CODE;" + genl + genl "#{x.classname}();" + scope("#{x.classname}(",");") { genl x.parameters } unless x.fields.empty? + genl + genl "void accept(Visitor&) const;" + genl + yield if block + } + end + + def action_struct_cpp(x) + genl + genl "const char* #{x.classname}::NAME=\"#{x.fqname}\";" + genl + genl "#{x.classname}::#{x.classname}() {}"; + genl + if not x.fields.empty? + scope("#{x.classname}::#{x.classname}(",") :") { genl x.parameters } + indent() { genl x.initializers } + genl "{}" + genl + end + scope("void #{x.classname}::accept(Visitor&) const {","}") { + genl "// FIXME aconway 2008-02-27: todo" + } + end + + # structs + + def struct_h(s) action_struct_h(s, "Struct", ["size","pack","code"]); end + def struct_cpp(s) action_struct_cpp(s) end + + # command and control + + def action_h(a) + action_struct_h(a, a.base, ["code"]) { + scope("template <class T> void invoke(T& target) {","}") { + scope("target.#{a.funcname}(", ");") { genl a.values } + } + genl + scope("template <class S> void serialize(S& s) {","}") { + gen "s" + a.fields.each { |f| gen "(#{f.cppname})"} + genl ";" + } unless a.fields.empty? + } + end + + def action_cpp(a) action_struct_cpp(a); end + + # Types that must be generated early because they are used by other types. + def pregenerate?(x) not @amqp.used_by[x.fqname].empty?; end + + # Generate the log + def gen_specification() + h_file("#{@dir}/specification") { + include "#{@dir}/built_in_types" + include "#{@dir}/helpers" + include "<boost/call_traits.hpp>" + genl "using boost::call_traits;" + namespace(@ns) { + # Top level + @amqp.domains.each { |d| + # segment-type and track are are built in + domain_h d unless ["track","segment-type"].include?(d.name) + } + puts @amqp.used_by.inspect + + # Domains and structs that must be generated early because + # they are used by other definitions: + each_class_ns { |c| + class_h c + c.domains.each { |d| domain_h d if pregenerate? d } + c.structs.each { |s| struct_h s if pregenerate? s } + } + # Now dependent domains/structs and actions + each_class_ns { |c| + c.domains.each { |d| domain_h d if not pregenerate? d } + c.structs.each { |s| struct_h s if not pregenerate? s } + c.actions.each { |a| action_h a } + } + } + } + + cpp_file("#{@dir}/specification") { + include "#{@dir}/specification" + namespace(@ns) { + each_class_ns { |c| + class_cpp c + c.actions.each { |a| action_cpp a} + c.structs.each { |s| struct_cpp s } + } + } + } + end + + def gen_proxy() + h_file("#{@dir}/Proxy.h") { + include "#{@dir}/specification" + namespace(@ns) { + genl "template <class F, class R=F::result_type>" + cpp_class("ProxyTemplate") { + public + genl "ProxyTemplate(F f) : functor(f) {}" + @amqp.classes.each { |c| + c.actions.each { |a| + scope("R #{a.funcname}(", ")") { genl a.parameters } + scope() { + var=a.name.funcname + scope("#{a.classname} #{var}(",");") { genl a.arguments } + genl "return functor(#{var});" + } + } + } + private + genl "F functor;" + } + } + } + end + + def generate + gen_specification + gen_proxy + end +end + +Specification.new($outdir, $amqp).generate(); + diff --git a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb index 18f74a2e41..f9ef95f5a0 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb @@ -23,5 +23,5 @@ class MethodBodyConstVisitorGen < CppGen end end -MethodBodyConstVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyConstVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb index 4944b10a84..a74b0c06d6 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb @@ -31,5 +31,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodHolder.rb b/qpid/cpp/rubygen/99-0/MethodHolder.rb index 7e915677b2..a708db6676 100755 --- a/qpid/cpp/rubygen/templates/MethodHolder.rb +++ b/qpid/cpp/rubygen/99-0/MethodHolder.rb @@ -96,5 +96,5 @@ EOS end end -MethodHolderGen.new(Outdir, Amqp).generate(); +MethodHolderGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/99-0/Operations.rb index 91007ef3e1..c985bb6105 100755 --- a/qpid/cpp/rubygen/templates/Operations.rb +++ b/qpid/cpp/rubygen/99-0/Operations.rb @@ -91,6 +91,6 @@ EOS end end -OperationsGen.new("client",ARGV[0], Amqp).generate() -OperationsGen.new("server",ARGV[0], Amqp).generate() +OperationsGen.new("client",ARGV[0], $amqp).generate() +OperationsGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/OperationsInvoker.rb b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb index 747dd06189..642f98ce8e 100755 --- a/qpid/cpp/rubygen/templates/OperationsInvoker.rb +++ b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb @@ -88,5 +88,5 @@ class OperationsInvokerGen < CppGen end end -OperationsInvokerGen.new("client",ARGV[0], Amqp).generate() -OperationsInvokerGen.new("server",ARGV[0], Amqp).generate() +OperationsInvokerGen.new("client",ARGV[0], $amqp).generate() +OperationsInvokerGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb index 467476506c..2829884673 100755 --- a/qpid/cpp/rubygen/templates/Proxy.rb +++ b/qpid/cpp/rubygen/99-0/Proxy.rb @@ -62,7 +62,9 @@ EOS include "<sstream>" include "#{@classname}.h" include "qpid/framing/amqp_types_full.h" - Amqp.methods_on(@chassis).each { |m| include "qpid/framing/"+m.body_name } + @amqp.methods_on(@chassis).each { + |m| include "qpid/framing/"+m.body_name + } genl namespace("qpid::framing") { genl "#{@classname}::#{@classname}(FrameHandler& f) :" @@ -75,6 +77,6 @@ EOS end -ProxyGen.new("client", Outdir, Amqp).generate; -ProxyGen.new("server", Outdir, Amqp).generate; +ProxyGen.new("client", $outdir, $amqp).generate; +ProxyGen.new("server", $outdir, $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb index 6a50fdb462..e01a28a62d 100644 --- a/qpid/cpp/rubygen/templates/Session.rb +++ b/qpid/cpp/rubygen/99-0/Session.rb @@ -190,6 +190,6 @@ EOS end end -SessionNoKeywordGen.new(ARGV[0], Amqp).generate() -SessionGen.new(ARGV[0], Amqp).generate() +SessionNoKeywordGen.new(ARGV[0], $amqp).generate() +SessionGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/all_method_bodies.rb b/qpid/cpp/rubygen/99-0/all_method_bodies.rb index d06f459493..5971d49189 100755 --- a/qpid/cpp/rubygen/templates/all_method_bodies.rb +++ b/qpid/cpp/rubygen/99-0/all_method_bodies.rb @@ -17,5 +17,5 @@ class AllMethodBodiesGen < CppGen end end -AllMethodBodiesGen.new(Outdir, Amqp).generate(); +AllMethodBodiesGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/constants.rb b/qpid/cpp/rubygen/99-0/constants.rb index 5fbbefe218..b5f559d504 100755 --- a/qpid/cpp/rubygen/templates/constants.rb +++ b/qpid/cpp/rubygen/99-0/constants.rb @@ -78,5 +78,5 @@ class ConstantsGen < CppGen end end -ConstantsGen.new(Outdir, Amqp).generate(); +ConstantsGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/frame_body_lists.rb b/qpid/cpp/rubygen/99-0/frame_body_lists.rb index 634001ab14..b20e4550f3 100644 --- a/qpid/cpp/rubygen/templates/frame_body_lists.rb +++ b/qpid/cpp/rubygen/99-0/frame_body_lists.rb @@ -26,6 +26,6 @@ EOS end end -FrameBodyListsGen.new(ARGV[0], Amqp).generate; +FrameBodyListsGen.new(ARGV[0], $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb index edbadb01f6..336591be00 100644 --- a/qpid/cpp/rubygen/templates/structs.rb +++ b/qpid/cpp/rubygen/99-0/structs.rb @@ -534,5 +534,5 @@ EOS end end -StructGen.new(ARGV[0], Amqp).generate() +StructGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb index d1212153ca..1fff1d51db 100755 --- a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb @@ -30,5 +30,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index f998909ec8..b1e635a27b 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -119,7 +119,16 @@ class AmqpElement # List of children of type elname, or all children if elname # not specified. def children(elname=nil) - @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + if elname + @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + else + @children + end + end + + def each_descendant(&block) + yield self + @children.each { |c| c.each_descendant(&block) } end # Look up child of type elname with attribute name. @@ -127,6 +136,9 @@ class AmqpElement @cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name } end + # Fully qualified amqp dotted name of this element + def dotted_name() (parent ? parent.dotted_name+"." : "") + name; end + # The root <amqp> element. def root() @root ||=parent ? parent.root : self; end @@ -140,9 +152,22 @@ class AmqpElement # Text of doc child if there is one. def doc() d=xml.elements["doc"]; d and d.text; end + def fqname() + throw "fqname: #{self} #{parent.fqname} has no name" unless name + p=parent && parent.fqname + p ? p+"."+name : name; + end + + def containing_class() + return self if is_a? AmqpClass + return parent && parent.containing_class + end + end -AmqpResponse = AmqpElement +class AmqpResponse < AmqpElement + def initialize(xml, parent) super; end +end class AmqpDoc < AmqpElement def initialize(xml,parent) super; end @@ -153,18 +178,32 @@ class AmqpChoice < AmqpElement def initialize(xml,parent) super; end amqp_attr_reader :name, :value end - + class AmqpEnum < AmqpElement def initialize(xml,parent) super; end amqp_child_reader :choice end +# 0-10 array domains are missing element type information, add it here. +ArrayTypes={ + "str16-array" => "str-16", + "amqp-host-array" => "connection.amqp-host-url", + "command-fragments" => "session.command-fragment", + "in-doubt" => "dtx.xid" +} + class AmqpDomain < AmqpElement - def initialize(xml, parent) super; end + def initialize(xml, parent) + super + root.used_by[uses].push(fqname) if uses and uses.index('.') + end + amqp_attr_reader :type amqp_single_child_reader :struct # preview amqp_single_child_reader :enum + def uses() type_=="array" ? ArrayTypes[name] : type_; end + def unalias() d=self while (d.type_ != d.name and root.domain(d.type_)) @@ -180,10 +219,13 @@ class AmqpException < AmqpElement end class AmqpField < AmqpElement - def initialize(xml, amqp) super; end; + def initialize(xml, amqp) + super; + root.used_by[type_].push(parent.fqname) if type_ and type_.index('.') + end + def domain() root.domain(xml.attributes["domain"]); end amqp_single_child_reader :struct # preview - # FIXME aconway 2008-02-21: exceptions in fields - need to update c++ mapping. amqp_child_reader :exception amqp_attr_reader :type, :default, :code, :required end @@ -198,15 +240,11 @@ class AmqpConstant < AmqpElement amqp_attr_reader :value, :class end -# FIXME aconway 2008-02-21: -# class AmqpResponse < AmqpElement -# def initialize(xml, parent) super; end -# end - class AmqpResult < AmqpElement def initialize(xml, parent) super; end amqp_single_child_reader :struct # preview amqp_attr_reader :type + def name() "result"; end end class AmqpEntry < AmqpElement @@ -236,8 +274,8 @@ class AmqpStruct < AmqpElement amqp_attr_reader :size, :code, :pack amqp_child_reader :field - alias :raw_pack :pack # preview - preview code needs default "short" for pack. + alias :raw_pack :pack def pack() raw_pack or (not parent.final? and "short"); end def result?() parent.xml.name == "result"; end def domain?() parent.xml.name == "domain"; end @@ -265,17 +303,21 @@ class AmqpRole < AmqpElement amqp_attr_reader :implement end -class AmqpControl < AmqpElement +# Base class for command and control. +class AmqpAction < AmqpElement def initialize(xml,amqp) super; end amqp_child_reader :implement, :field, :response amqp_attr_reader :code end -class AmqpCommand < AmqpElement +class AmqpControl < AmqpAction + def initialize(xml,amqp) super; end +end + +class AmqpCommand < AmqpAction def initialize(xml,amqp) super; end - amqp_child_reader :implement, :field, :exception, :response + amqp_child_reader :exception amqp_single_child_reader :result, :segments - amqp_attr_reader :code end class AmqpClass < AmqpElement @@ -296,12 +338,19 @@ class AmqpClass < AmqpElement def l4?() # preview !["connection", "session", "execution"].include?(name) end + + def actions() controls+commands; end end class AmqpType < AmqpElement + def initialize(xml,amqp) super; end amqp_attr_reader :code, :fixed_width, :variable_width end +class AmqpXref < AmqpElement + def initialize(xml,amqp) super; end +end + # AMQP root element. class AmqpRoot < AmqpElement amqp_attr_reader :major, :minor, :port, :comment @@ -314,14 +363,16 @@ class AmqpRoot < AmqpElement raise "No XML spec files." if specs.empty? xml=parse(specs.shift) specs.each { |s| xml_merge(xml, parse(s)) } + @used_by=Hash.new{ |h,k| h[k]=[] } super(xml, nil) end + attr_reader :used_by + + def merge(root) xml_merge(xml, root.xml); end + def version() major + "-" + minor; end - # Find a child node from a dotted amqp name, e.g. message.transfer - def lookup(dotted_name) elements[dotted_name.gsub(/\./,"/")]; end - # preview - only struct child reader remains for new mapping def domain_structs() domains.map{ |d| d.struct }.compact; end def result_structs() @@ -337,6 +388,8 @@ class AmqpRoot < AmqpElement @methods_on[chassis] ||= classes.map { |c| c.methods_on(chassis) }.flatten end + def fqname() nil; end + # TODO aconway 2008-02-21: methods by role. private @@ -353,7 +406,7 @@ end # Collect information about generated files. class GenFiles @@files =[] - def GenFiles.add(f) @@files << f; puts f; end + def GenFiles.add(f) @@files << f; end def GenFiles.get() @@files; end end @@ -366,26 +419,38 @@ class Generator def initialize (outdir, amqp) @amqp=amqp @outdir=outdir - @prefix='' # For indentation or comments. + @prefix=[''] # For indentation or comments. @indentstr=' ' # One indent level. @outdent=2 - Pathname.new(@outdir).mkpath unless @outdir=="-" or File.directory?(@outdir) + Pathname.new(@outdir).mkpath unless @outdir=="-" end # Create a new file, set @out. - def file(file) + def file(file, &block) GenFiles.add file if (@outdir != "-") - path=Pathname.new "#{@outdir}/#{file}" - path.parent.mkpath - path.open('w') { |@out| yield } + @path=Pathname.new "#{@outdir}/#{file}" + @path.parent.mkpath + @out=String.new # Generate in memory first + if block then yield; endfile; end + end + end + + def endfile() + if @outdir != "-" + if @path.exist? and @path.read == @out + puts "Skipped #{@path} - unchanged" # Dont generate if unchanged + else + @path.open('w') { |f| f << @out } + puts "Generated #{@path}" + end end end # Append multi-line string to generated code, prefixing each line. def gen (str) str.each_line { |line| - @out << @prefix unless @midline + @out << @prefix.last unless @midline @out << line @midline = nil } @@ -400,23 +465,25 @@ class Generator end # Generate code with added prefix. - def prefix(add) - save=@prefix - @prefix+=add - yield - @prefix=save + def prefix(add, &block) + @prefix.push @prefix.last+add + if block then yield; endprefix; end + end + + def endprefix() + @prefix.pop end # Generate indented code def indent(n=1,&block) prefix(@indentstr * n,&block); end + alias :endindent :endprefix # Generate outdented code def outdent(&block) - save=@prefix - @prefix=@prefix[0...-2] - yield - @prefix=save + @prefix.push @prefix.last[0...-2] + if block then yield; endprefix; end end + alias :endoutdent :endprefix attr_accessor :out end diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 60a653e18d..edee72e0bd 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -54,12 +54,31 @@ CppKeywords = Set.new(["and", "and_eq", "asm", "auto", "bitand", CppMangle = CppKeywords+Set.new(["string"]) class String - def cppsafe() - CppMangle.include?(self) ? self+"_" : self + def cppsafe() CppMangle.include?(self) ? self+"_" : self; end + + def amqp2cpp() + path=split(".") + name=path.pop + return name.typename if path.empty? + path.map! { |n| n.nsname } + return (path << name.caps).join("::") end + + alias :typename :caps + alias :nsname :bars + alias :constname :shout + alias :funcname :lcaps + alias :varname :lcaps end # Hold information about a C++ type. +# +# preview - new mapping does not use CppType, +# Each amqp type corresponds exactly by dotted name +# to a type, domain or struct, which in turns +# corresponds by name to a C++ type or typedef. +# (see String.amqp2cpp) +# class CppType def initialize(name) @name=@param=@ret=name; end attr_reader :name, :param, :ret, :code @@ -93,11 +112,22 @@ class CppType def to_s() name; end; end +class AmqpElement + def cppfqname() + names=parent.dotted_name.split(".") + # Field children are moved up to method in C++b + prefix.pop if parent.is_a? AmqpField + prefix.push cppname + prefix.join("::") + end +end + class AmqpField def cppname() name.lcaps.cppsafe; end def cpptype() domain.cpptype; end def bit?() domain.type_ == "bit"; end def signature() cpptype.param+" "+cppname; end + def paramtype() "call_traits<#{type_.amqp2cpp}>::param_type"; end end class AmqpMethod @@ -107,8 +137,41 @@ class AmqpMethod def body_name() parent.name.caps+name.caps+"Body"; end end +module AmqpHasFields + def parameters() + fields.map { |f| "#{f.paramtype} #{f.cppname}_"}.join(",\n") + end + + def arguments() + fields.map { |f| "#{f.cppname}_"}.join(",\n") + end + + def values() + fields.map { |f| "#{f.cppname}"}.join(",\n") + end + + def initializers() + fields.map { |f| "#{f.cppname}(#{f.cppname}_)"}.join(",\n") + end +end + +class AmqpAction + def classname() name.typename; end + def funcname() parent.name.funcname + name.caps; end + include AmqpHasFields +end + +class AmqpCommand < AmqpAction + def base() "Command"; end +end + +class AmqpControl < AmqpAction + def base() "Control"; end +end + class AmqpClass - def cppname() name.caps; end + def cppname() name.caps; end # preview + def nsname() name.nsname; end end class AmqpDomain @@ -150,18 +213,26 @@ class AmqpResult end class AmqpStruct - def cpp_pack_type() AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); end - def cpptype() parent.cpptype; end - def cppname() cpptype.name; end + include AmqpHasFields + + def cpp_pack_type() # preview + AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); + end + def cpptype() parent.cpptype; end # preview + def cppname() cpptype.name; end # preview + + def classname() name.typename; end end class CppGen < Generator def initialize(outdir, *specs) super(outdir,*specs) + # need to sort classes for dependencies + @actions=[] # Stack of end-scope actions end # Write a header file. - def h_file(path) + def h_file(path, &block) path = (/\.h$/ === path ? path : path+".h") guard=path.upcase.tr('./-','_') file(path) { @@ -169,12 +240,12 @@ class CppGen < Generator gen "#define #{guard}\n" gen Copyright yield - gen "#endif /*!#{guard}*/\n" + gen "#endif /*!#{guard}*/\n" } end # Write a .cpp file. - def cpp_file(path) + def cpp_file(path, &block) path = (/\.cpp$/ === path ? path : path+".cpp") file(path) do gen Copyright @@ -188,10 +259,12 @@ class CppGen < Generator genl "#include #{header}" end - def scope(open="{",close="}", &block) - genl open; indent(&block); genl close + def scope(open="{",close="}", &block) + genl open + indent &block + genl close end - + def namespace(name, &block) genl names = name.split("::") @@ -210,7 +283,7 @@ class CppGen < Generator indent { gen "#{bases.join(",\n")}" } end genl - scope("{","};") { yield } + scope("{","};", &block) end def struct(name, *bases, &block) @@ -242,6 +315,11 @@ class CppGen < Generator prefix(" * ",&block) genl " */" end + + # Generate code in namespace for each class + def each_class_ns() + @amqp.classes.each { |c| namespace(c.nsname) { yield c } } + end end # Fully-qualified class name diff --git a/qpid/cpp/rubygen/generate b/qpid/cpp/rubygen/generate index 94b194aaa4..d094be4f41 100755 --- a/qpid/cpp/rubygen/generate +++ b/qpid/cpp/rubygen/generate @@ -1,5 +1,6 @@ #!/usr/bin/env ruby require 'amqpgen' +require 'pathname' # # Run a set of code generation templates. @@ -18,17 +19,39 @@ EOS exit 1 end -Outdir=ARGV[0] -Specs=ARGV.grep(/\.xml$/) -Amqp=AmqpRoot.new(*Specs) +# Create array of specs by version +def parse_specs(specs) + roots={ } + specs.each { |spec| + root=AmqpRoot.new(spec) + ver=root.version + if (roots[ver]) + roots[ver].merge(root) + else + roots[ver]=root + end + } + roots +end # Run selected templates if ARGV.any? { |arg| arg=="all" } - templates=Dir["#{File.dirname __FILE__}/templates/*.rb"] + templates=Dir["#{File.dirname __FILE__}/*/*.rb"] else templates=ARGV.grep(/\.rb$/) end -templates.each { |t| load t } + +$outdir=ARGV[0] +$models=parse_specs(ARGV.grep(/\.xml$/)) +templates.each { |t| + ver=Pathname.new(t).dirname.basename.to_s + $amqp=$models[ver] + if $amqp + load t + else + puts "Warning: skipping #{t}, no spec file for version #{ver}." + end +} def make_continue(lines) lines.join(" \\\n "); end @@ -40,7 +63,7 @@ if makefile generator_files=Dir["**/*.rb"] << File.basename(__FILE__) Dir.chdir dir rgen_generator=generator_files.map{ |f| "$(rgen_dir)/#{f}" } - rgen_srcs=GenFiles.get.map{ |f| "#{Outdir}/#{f}" } + rgen_srcs=GenFiles.get.map{ |f| "#{$outdir}/#{f}" } File.open(makefile, 'w') { |out| out << <<EOS @@ -51,13 +74,13 @@ rgen_generator=#{make_continue rgen_generator} rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))} -rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))} +rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r{qpid/(framing|amqp_.+)/.+\.cpp$}))} rgen_srcs=#{make_continue rgen_srcs} # Header file install rules. EOS - ["framing", "client/no_keyword","client", "broker"].each { |ns| + ["amqp_0_10", "framing", "client/no_keyword","client", "broker"].each { |ns| dir="qpid/#{ns}" dir_ = dir.tr("/", "_") regex=%r|#{dir}/[^/]+\.h$| @@ -69,7 +92,7 @@ EOS } out << <<EOS if GENERATE -$(rgen_srcs) $(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) +$(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) $(rgen_cmd) # Empty rule in case a generator file is renamed/removed. diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index b44aaa7e5e..e3b95e045f 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -11,11 +11,13 @@ EXTRA_DIST= $(platform_dist) # This phony target is needed by generated makefile fragments: force: -# AMQP_XML is defined in ../configure.ac -specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml - if GENERATE +# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac +amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml +amqp_0_10_xml=@AMQP_FINAL_XML@ +specs=$(amqp_99_0_xml) $(amqp_0_10_xml) + # Ruby generator. rgen_dir=$(top_srcdir)/rubygen rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk @@ -101,6 +103,7 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_common_cpp) \ $(platform_src) \ + qpid/amqp_0_10/helpers.cpp \ qpid/Serializer.h \ qpid/amqp_0_10/built_in_types.h \ qpid/amqp_0_10/Codec.h \ @@ -166,6 +169,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PreviewConnection.cpp \ qpid/broker/PreviewConnectionHandler.cpp \ qpid/broker/PreviewSessionHandler.cpp \ + qpid/broker/PreviewSessionManager.cpp \ + qpid/broker/PreviewSessionState.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -247,6 +252,7 @@ libqpidclient_la_SOURCES = \ nobase_include_HEADERS = \ $(platform_hdr) \ + qpid/amqp_0_10/helpers.h \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ @@ -270,6 +276,8 @@ nobase_include_HEADERS = \ qpid/broker/PreviewConnection.h \ qpid/broker/PreviewConnectionHandler.h \ qpid/broker/PreviewSessionHandler.h \ + qpid/broker/PreviewSessionManager.h \ + qpid/broker/PreviewSessionState.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index 07f157cfc3..17f0d5029c 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -32,20 +32,31 @@ std::string strError(int err) { return std::string(strerror_r(err, buf, sizeof(buf))); } -Exception::Exception(const std::string& s) throw() : msg(s) { - QPID_LOG(warning, "Exception: " << msg); +Exception::Exception(const std::string& msg, + const std::string& nm, + uint16_t cd) throw() + : message(msg), name(nm), code(cd), + whatStr((name.empty() ? "" : name + ": ")+ msg) +{ + QPID_LOG(warning, "Exception: " << whatStr); } Exception::~Exception() throw() {} -std::string Exception::str() const throw() { - if (msg.empty()) - const_cast<std::string&>(msg).assign(typeid(*this).name()); - return msg; +std::string Exception::getMessage() const throw() { return message; } + +std::string Exception::getName() const throw() { + return name.empty() ? typeid(*this).name() : name; } -const char* Exception::what() const throw() { return str().c_str(); } +uint16_t Exception::getCode() const throw() { return code; } + +const char* Exception::what() const throw() { + if (whatStr.empty()) return typeid(*this).name(); + else return whatStr.c_str(); +} -const std::string ClosedException::CLOSED_MESSAGE("Closed"); +ClosedException::ClosedException(const std::string& msg) + : Exception(msg, "ClosedException") {} } // namespace qpid diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h index 57c7a21234..00365018ba 100644 --- a/qpid/cpp/src/qpid/Exception.h +++ b/qpid/cpp/src/qpid/Exception.h @@ -40,13 +40,27 @@ std::string strError(int err); class Exception : public std::exception { public: - explicit Exception(const std::string& str=std::string()) throw(); + explicit Exception(const std::string& message=std::string(), + const std::string& name=std::string(), + uint16_t code=0) throw(); + virtual ~Exception() throw(); + + // returns "name: message" + virtual const char* what() const throw(); + + virtual std::string getName() const throw(); + virtual std::string getMessage() const throw(); + virtual uint16_t getCode() const throw(); + + // FIXME aconway 2008-02-21: backwards compat, remove? + std::string str() const throw() { return getMessage(); } - virtual const char *what() const throw(); - virtual std::string str() const throw(); private: - std::string msg; + const std::string message; + const std::string name; + const uint16_t code; + const std::string whatStr; }; struct ChannelException : public Exception { @@ -62,8 +76,7 @@ struct ConnectionException : public Exception { }; struct ClosedException : public Exception { - static const std::string CLOSED_MESSAGE; - ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {} + ClosedException(const std::string& msg=std::string()); }; } // namespace qpid diff --git a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h index 6cd9c72367..445f07459c 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h +++ b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h @@ -29,6 +29,7 @@ #include <boost/array.hpp> #include <stdint.h> #include <string> +#include <vector> /**@file Mapping from built-in AMQP types to C++ types */ @@ -66,7 +67,7 @@ typedef double Double; typedef float Float; typedef framing::SequenceNumber SequenceNo; using framing::Uuid; -typedef sys::AbsTime DateTime; +typedef sys::AbsTime Datetime; typedef Decimal<Uint8, Int32> Dec32; typedef Decimal<Uint8, Int64> Dec64; @@ -89,14 +90,18 @@ typedef CodableString<Uint16, Uint16> Str16Utf16; typedef CodableString<Uint8, Uint32> Vbin32; -/** FIXME aconway 2008-02-20: todo -byte-ranges 2 byte ranges within a 64-bit payload -sequence-set 2 ranged set representation -map 0xa8 4 a mapping of keys to typed values -list 0xa9 4 a series of consecutive type-value pairs -array 0xaa 4 a defined length collection of values of a single type -struct32 0xab 4 a coded struct with a 32-bit size -*/ +// FIXME aconway 2008-02-26: array encoding +template <class T> struct Array : public std::vector<T> {}; + +// FIXME aconway 2008-02-26: Unimplemented types: +struct ByteRanges {}; +struct SequenceSet {}; +struct Map {}; +struct List {}; +struct Struct32 {}; + +// Top level enum definitions. +enum SegmentType { CONTROL, COMMAND, HEADER, BODY }; }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp new file mode 100644 index 0000000000..4333a2cd92 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "helpers.h" + +namespace qpid { +namespace amqp_0_10 { + +Control::~Control() {} +Command::~Command() {} +Struct::~Struct() {} + +}} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.h b/qpid/cpp/src/qpid/amqp_0_10/helpers.h new file mode 100644 index 0000000000..1769d374d9 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.h @@ -0,0 +1,67 @@ +#ifndef QPID_AMQP_0_10_HELPERS_H +#define QPID_AMQP_0_10_HELPERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the +n * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { + +namespace amqp_0_10 { + +// Look up names by code +const char* getClassName(uint8_t code); +const char* getCommandName(uint8_t classCode, uint8_t code); +const char* getControlName(uint8_t classCode, uint8_t code); +const char* getStructName(uint8_t classCode, uint8_t code); + +struct Command { + virtual ~Command(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Control { + virtual ~Control(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Struct { + virtual ~Struct(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +/** Base class for generated enum domains. + * Enums map to classes for type safety and to provide separate namespaces + * for clashing values. + */ +struct Enum { + int value; + Enum(int v=0) : value(v) {} + operator int() const { return value; } + template <class S> void serialize(S &s) { s(value); } +}; + +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_HELPERS_H*/ diff --git a/qpid/cpp/src/qpid/amqp_0_10/visitors.h b/qpid/cpp/src/qpid/amqp_0_10/visitors.h new file mode 100644 index 0000000000..3835f37f3e --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/visitors.h @@ -0,0 +1,15 @@ +// Visitors +template <class Base> struct Visitor; +template <class Base, class F, class R> FunctorVisitor; + +/** Template base implementation for visitables. */ +template <class Base, class Derived> +struct VisitableBase : public Base { + virtual void accept(Visitor<Derived>& v) { + v.visit(static_cast<Derived>&(*this)); + } + virtual void accept(Visitor<Derived>& v) const { + v.visit(static_cast<const Derived>&(*this)); + } +}; + diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 0a0eb0a0df..9bfa868d9c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) : store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), - sessionManager(conf.ack) + sessionManager(conf.ack), + previewSessionManager(conf.ack) { // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 55bc7644a5..153eabc6b3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -30,6 +30,7 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" +#include "PreviewSessionManager.h" #include "Vhost.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } SessionManager& getSessionManager() { return sessionManager; } + PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; + PreviewSessionManager previewSessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index c8c1e12f28..ef2c51bb8d 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return session.getVersion();} + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} AccessHandler* getAccessHandler() { diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h index 410d400c9d..4c51e2a826 100644 --- a/qpid/cpp/src/qpid/broker/HandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h @@ -20,7 +20,7 @@ */ #include "SemanticState.h" -#include "SessionState.h" +#include "SessionContext.h" #include "ConnectionState.h" namespace qpid { @@ -35,13 +35,13 @@ class Broker; class HandlerImpl { protected: SemanticState& state; - SessionState& session; + SessionContext& session; HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } ConnectionState& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp index 19e6a235c4..36092bb7f6 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "PreviewSessionHandler.h" -#include "SessionState.h" +#include "PreviewSessionState.h" #include "PreviewConnection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -36,7 +36,7 @@ using namespace std; using namespace qpid::sys; PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const { void PreviewSessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); peerSession.attached(session->getId(), session->getTimeout()); } void PreviewSessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); + session = connection.broker.getPreviewSessionManager().resume(id); session->attach(*this); SequenceNumber seq = session->resuming(); peerSession.attached(session->getId(), session->getTimeout()); @@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) void PreviewSessionHandler::localSuspend() { if (session.get() && session->isAttached()) { session->detach(); - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } } @@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { + if (session->getState() == PreviewSessionState::RESUMING) { session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), @@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() { void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); } void PreviewSessionHandler::detached() { - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h index e1096ebf9f..4c517367d7 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h @@ -36,7 +36,7 @@ namespace qpid { namespace broker { class PreviewConnection; -class SessionState; +class PreviewSessionState; /** * A SessionHandler is associated with each active channel. It @@ -45,7 +45,7 @@ class SessionState; */ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand ~PreviewSessionHandler(); /** Returns 0 if not attached to a session */ - SessionState* getSession() { return session.get(); } - const SessionState* getSession() const { return session.get(); } + PreviewSessionState* getSession() { return session.get(); } + const PreviewSessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel.get(); } @@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; - std::auto_ptr<SessionState> session; + std::auto_ptr<PreviewSessionState> session; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp new file mode 100644 index 0000000000..ec73082817 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewSessionManager.h" +#include "PreviewSessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" +#include "qpid/memory.h" + +#include <boost/bind.hpp> +#include <boost/range.hpp> + +#include <algorithm> +#include <functional> +#include <ostream> + +namespace qpid { +namespace broker { + +using namespace sys; +using namespace framing; + +PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {} + +PreviewSessionManager::~PreviewSessionManager() {} + +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. +std::auto_ptr<PreviewSessionState> PreviewSessionManager::open( + PreviewSessionHandler& h, uint32_t timeout_) +{ + Mutex::ScopedLock l(lock); + std::auto_ptr<PreviewSessionState> session( + new PreviewSessionState(this, &h, timeout_, ack)); + active.insert(session->getId()); + for_each(observers.begin(), observers.end(), + boost::bind(&Observer::opened, _1,boost::ref(*session))); + return session; +} + +void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) { + Mutex::ScopedLock l(lock); + active.erase(session->getId()); + session->suspend(); + session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); + suspended.push_back(session.release()); // In expiry order + eraseExpired(); +} + +std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id) +{ + Mutex::ScopedLock l(lock); + eraseExpired(); + if (active.find(id) != active.end()) + throw SessionBusyException( + QPID_MSG("Session already active: " << id)); + Suspended::iterator i = std::find_if( + suspended.begin(), suspended.end(), + boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1)) + ); + if (i == suspended.end()) + throw InvalidArgumentException( + QPID_MSG("No suspended session with id=" << id)); + active.insert(id); + std::auto_ptr<PreviewSessionState> state(suspended.release(i).release()); + return state; +} + +void PreviewSessionManager::erase(const framing::Uuid& id) +{ + Mutex::ScopedLock l(lock); + active.erase(id); +} + +void PreviewSessionManager::eraseExpired() { + // Called with lock held. + if (!suspended.empty()) { + Suspended::iterator keep = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2)); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } + } +} + +void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) { + observers.push_back(o); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.h b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h new file mode 100644 index 0000000000..65ca49ec89 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H +#define QPID_BROKER_PREVIEWSESSIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/Uuid.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Mutex.h> +#include <qpid/RefCounted.h> + +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <set> +#include <vector> +#include <memory> + +namespace qpid { +namespace broker { + +class PreviewSessionState; +class PreviewSessionHandler; + +/** + * Create and manage PreviewSessionState objects. + */ +class PreviewSessionManager : private boost::noncopyable { + public: + /** + * Observer notified of PreviewSessionManager events. + */ + struct Observer : public RefCounted { + virtual void opened(PreviewSessionState&) {} + }; + + PreviewSessionManager(uint32_t ack); + + ~PreviewSessionManager(); + + /** Open a new active session, caller takes ownership */ + std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_); + + /** Suspend a session, start it's timeout counter. + * The factory takes ownership. + */ + void suspend(std::auto_ptr<PreviewSessionState> session); + + /** Resume a suspended session. + *@throw Exception if timed out or non-existant. + */ + std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); + + /** Add an Observer. */ + void add(const intrusive_ptr<Observer>&); + + private: + typedef boost::ptr_vector<PreviewSessionState> Suspended; + typedef std::set<framing::Uuid> Active; + typedef std::vector<intrusive_ptr<Observer> > Observers; + + void erase(const framing::Uuid&); + void eraseExpired(); + + sys::Mutex lock; + Suspended suspended; + Active active; + uint32_t ack; + Observers observers; + + friend class PreviewSessionState; // removes deleted sessions from active set. +}; + + + +}} // namespace qpid::broker + + + + + +#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/ diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp new file mode 100644 index 0000000000..7188ffbf40 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewSessionState.h" +#include "PreviewSessionManager.h" +#include "PreviewSessionHandler.h" +#include "ConnectionState.h" +#include "Broker.h" +#include "SemanticHandler.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +using namespace framing; +using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +PreviewSessionState::PreviewSessionState( + PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack, timeout_ > 0), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + in.next = semanticHandler.get(); + out.next = &handler->out; + + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } +} + +PreviewSessionState::~PreviewSessionState() { + // Remove ID from active session list. + if (factory) + factory->erase(getId()); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +PreviewSessionHandler* PreviewSessionState::getHandler() { + return handler; +} + +AMQP_ClientProxy& PreviewSessionState::getProxy() { + assert(isAttached()); + return getHandler()->getProxy(); +} + +ConnectionState& PreviewSessionState::getConnection() { + assert(isAttached()); + return getHandler()->getConnection(); +} + +void PreviewSessionState::detach() { + getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + Mutex::ScopedLock l(lock); + handler = 0; out.next = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } +} + +void PreviewSessionState::attach(PreviewSessionHandler& h) { + { + Mutex::ScopedLock l(lock); + handler = &h; + out.next = &handler->out; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } + } + h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} + +void PreviewSessionState::activateOutput() +{ + Mutex::ScopedLock l(lock); + if (isAttached()) { + getConnection().outputTasks.activateOutput(); + } +} + //This class could be used as the callback for queue notifications + //if not attached, it can simply ignore the callback, else pass it + //on to the connection + +ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->detach(); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + /* + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + */ + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.h b/qpid/cpp/src/qpid/broker/PreviewSessionState.h new file mode 100644 index 0000000000..6e8523317c --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.h @@ -0,0 +1,124 @@ +#ifndef QPID_BROKER_PREVIEWSESSION_H +#define QPID_BROKER_PREVIEWSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" +#include "SessionContext.h" + +#include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> + +#include <set> +#include <vector> +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class SemanticHandler; +class PreviewSessionHandler; +class PreviewSessionManager; +class Broker; +class ConnectionState; + +/** + * Broker-side session state includes sessions handler chains, which may + * themselves have state. + */ +class PreviewSessionState : public framing::SessionState, + public SessionContext, + public framing::FrameHandler::Chains, + public management::Manageable +{ + public: + ~PreviewSessionState(); + bool isAttached() { return handler; } + + void detach(); + void attach(PreviewSessionHandler& handler); + + + PreviewSessionHandler* getHandler(); + + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + ConnectionState& getConnection(); + + uint32_t getTimeout() const { return timeout; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + + /** OutputControl **/ + void activateOutput(); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + + // Normally SessionManager creates sessions. + PreviewSessionState(PreviewSessionManager*, + PreviewSessionHandler* out, + uint32_t timeout, + uint32_t ackInterval); + + + private: + PreviewSessionManager* factory; + PreviewSessionHandler* handler; + framing::Uuid id; + uint32_t timeout; + sys::AbsTime expiry; // Used by SessionManager. + Broker& broker; + framing::ProtocolVersion version; + sys::Mutex lock; + boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; + + friend class PreviewSessionManager; +}; + + +inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) { + return out << session.getId(); +} + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index 2a79496144..fdde7ec18c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,7 +22,6 @@ #include "SemanticHandler.h" #include "SemanticState.h" #include "SessionContext.h" -#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -37,9 +36,9 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : +SemanticHandler::SemanticHandler(SessionContext& s) : state(*this,s), session(s), - msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()), + msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()), ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) {} @@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - SessionContext* handler = session.getHandler(); - if (handler) { - uint32_t maxFrameSize = handler->getConnection().getFrameMax(); - MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize); - } else { - QPID_LOG(error, "Dropping message as session is no longer attached to a channel."); - } + uint32_t maxFrameSize = session.getConnection().getFrameMax(); + MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); return outgoing.hwm; } diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index d7f3ec8799..893a0cbded 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -46,7 +46,7 @@ class AMQHeaderBody; namespace broker { -class SessionState; +class SessionContext; class SemanticHandler : public DeliveryAdapter, public framing::FrameHandler, @@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter, typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; SemanticState state; - SessionState& session; + SessionContext& session; // TODO aconway 2007-09-20: Why are these on the handler rather than the // state? IncomingExecutionContext incoming; @@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter, framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } //Connection& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } public: - SemanticHandler(SessionState& session); + SemanticHandler(SessionContext& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 7b4035604f..9b44f31e14 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -19,7 +19,7 @@ * */ -#include "SessionState.h" +#include "SessionContext.h" #include "BrokerAdapter.h" #include "Queue.h" #include "Connection.h" @@ -56,7 +56,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; -SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) +SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), deliveryAdapter(da), prefetchSize(0), @@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (parent->getSession().isAttached() && accept(msg.payload)) { - allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } - return true; - } else { - QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); - return false; + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } + return true; } bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) @@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(getSession().getBroker(), queue); + Queue::tryAutoDelete(session.getBroker(), queue); } } } @@ -352,7 +347,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); + cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 7fc6e4167c..cc9c0e1e9b 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -45,7 +45,7 @@ namespace qpid { namespace broker { -class SessionState; +class SessionContext; /** * SemanticState holds the L3 and L4 state of an open session, whether @@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains, typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; - SessionState& session; + SessionContext& session; DeliveryAdapter& deliveryAdapter; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains, void cancel(ConsumerImpl&); public: - SemanticState(DeliveryAdapter&, SessionState&); + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); - SessionState& getSession() { return session; } + SessionContext& getSession() { return session; } /** * Get named queue, never returns 0. diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index a27b43cf65..a289310b15 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/OutputControl.h" #include "ConnectionState.h" @@ -33,17 +34,13 @@ namespace qpid { namespace broker { -class SessionContext : public framing::FrameHandler::InOutHandler +class SessionContext : public sys::OutputControl { public: - SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {} virtual ~SessionContext(){} virtual ConnectionState& getConnection() = 0; - virtual const ConnectionState& getConnection() const = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; - virtual const framing::AMQP_ClientProxy& getProxy() const = 0; - virtual void detach() = 0; - virtual framing::ChannelId getChannel() const = 0; + virtual Broker& getBroker() = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 5a72bfb12d..e6bc463a82 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -28,7 +28,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" -#include "SessionContext.h" +#include "qpid/framing/SequenceNumber.h" #include <boost/noncopyable.hpp> @@ -36,16 +36,18 @@ namespace qpid { namespace broker { class Connection; +class ConnectionState; class SessionState; /** * A SessionHandler is associated with each active channel. It - * receives incoming frames, handles session commands and manages the + * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::AMQP_ServerOperations::ExecutionHandler, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); void detached(); + //Execution methods: + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; - Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index aa7ac9a8bb..571d3365db 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -45,7 +45,7 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionContext& h, uint32_t timeout_) + SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h index 94956a83ed..7e8bd18f57 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.h +++ b/qpid/cpp/src/qpid/broker/SessionManager.h @@ -38,7 +38,7 @@ namespace qpid { namespace broker { class SessionState; -class SessionContext; +class SessionHandler; /** * Create and manage SessionState objects. @@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index b6c59cfb3b..573a567da6 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -19,12 +19,16 @@ * */ #include "SessionState.h" -#include "SessionManager.h" -#include "SessionContext.h" -#include "ConnectionState.h" #include "Broker.h" +#include "ConnectionState.h" +#include "MessageDelivery.h" #include "SemanticHandler.h" +#include "SessionManager.h" +#include "SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" + +#include <boost/bind.hpp> namespace qpid { namespace broker { @@ -37,17 +41,17 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) + semanticState(*this, *this), + adapter(semanticState), + msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) { - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.addOutputTask(&semanticState); Manageable* parent = broker.GetVhostObject (); @@ -76,7 +80,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionContext* SessionState::getHandler() { +SessionHandler* SessionState::getHandler() { return handler; } @@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() { } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; + handler = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); } } -void SessionState::attach(SessionContext& h) { +void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; - out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); @@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + h.getConnection().outputTasks.addOutputTask(&semanticState); } void SessionState::activateOutput() @@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } +void SessionState::handleCommand(framing::AMQMethodBody* method) +{ + SequenceNumber id = incoming.next(); + Invoker::Result invocation = invoke(adapter, *method); + incoming.complete(id); + + if (!invocation.wasHandled()) { + throw NotImplementedException("Not implemented"); + } else if (invocation.hasResult()) { + getProxy().getExecution().result(id.getValue(), invocation.getResult()); + } + if (method->isSync()) { + incoming.sync(id); + sendCompletion(); + } + //TODO: if window gets too large send unsolicited completion +} + +void SessionState::handleContent(AMQFrame& frame) +{ + intrusive_ptr<Message> msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(incoming.next()); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&getConnection()); + semanticState.handle(msg); + msgBuilder.end(); + incoming.track(msg); + if (msg->getFrames().getMethod()->isSync()) { + incoming.sync(msg->getCommandId()); + sendCompletion(); + } + } +} + +void SessionState::handle(AMQFrame& frame) +{ + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assmblies are non-content bearing and all content-bearing + //assmeblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + handleContent(frame); + } + +} + +DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); + return outgoing.hwm; +} + +void SessionState::sendCompletion() +{ + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); + getProxy().getExecution().complete(mark.getValue(), range); +} + +void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + semanticState.ackCumulative(mark.getValue()); + } + range.processRanges(ackOp); +} + +void SessionState::flush() +{ + incoming.flush(); + sendCompletion(); +} + +void SessionState::sync() +{ + incoming.sync(); + sendCompletion(); +} + +void SessionState::noop() +{ + incoming.noop(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 8a12e580b7..98c21a8ab5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -27,10 +27,15 @@ #include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" +#include "BrokerAdapter.h" +#include "DeliveryAdapter.h" +#include "MessageBuilder.h" +#include "SessionContext.h" +#include "SemanticState.h" +#include "IncomingExecutionContext.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -47,8 +52,7 @@ class AMQP_ClientProxy; namespace broker { -class SemanticHandler; -class SessionContext; +class SessionHandler; class SessionManager; class Broker; class ConnectionState; @@ -58,8 +62,8 @@ class ConnectionState; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::Chains, - public sys::OutputControl, + public SessionContext, + public DeliveryAdapter, public management::Manageable { public: @@ -67,10 +71,10 @@ class SessionState : public framing::SessionState, bool isAttached() { return handler; } void detach(); - void attach(SessionContext& handler); + void attach(SessionHandler& handler); - SessionContext* getHandler(); + SessionHandler* getHandler(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,6 +89,19 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + void handle(framing::AMQFrame& frame); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); + + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void sync(); + void sendCompletion(); + + //delivery adapter methods: + DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t @@ -92,21 +109,32 @@ class SessionState : public framing::SessionState, // Normally SessionManager creates sessions. SessionState(SessionManager*, - SessionContext* out, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); private: + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + SessionManager* factory; - SessionContext* handler; + SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; framing::ProtocolVersion version; sys::Mutex lock; - boost::scoped_ptr<SemanticHandler> semanticHandler; + + SemanticState semanticState; + BrokerAdapter adapter; + MessageBuilder msgBuilder; + + //execution state + IncomingExecutionContext incoming; + framing::Window outgoing; + RangedOperation ackOp; + management::Session::shared_ptr mgmtObject; friend class SessionManager; diff --git a/qpid/cpp/src/qpid/client/Session.h b/qpid/cpp/src/qpid/client/Session.h index 3293af60fe..5d91f289e2 100644 --- a/qpid/cpp/src/qpid/client/Session.h +++ b/qpid/cpp/src/qpid/client/Session.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { /** - * Session is currently just an alias for Session_0_10 + * Session is currently just an alias for Session_99_0 * * \ingroup clientapi */ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..5152aa2e43 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,7 @@ */ #include "Cluster.h" -#include "qpid/broker/SessionState.h" +#include "qpid/broker/PreviewSessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,18 +32,18 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::PreviewSessionState; namespace { // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + PreviewSessionState& session; Cluster& cluster; bool busy; Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} void handle(AMQFrame& f) { Mutex::ScopedLock l(lock); @@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) { c.next = h; } -struct SessionObserver : public broker::SessionManager::Observer { +struct SessionObserver : public broker::PreviewSessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void opened(PreviewSessionState& s) { // FIXME aconway 2008-01-29: IList for memory management. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b62b2be5f1..733db8003d 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } + intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - intrusive_ptr<broker::SessionManager::Observer> observer; + intrusive_ptr<broker::PreviewSessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..0ea3953175 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); + broker->getPreviewSessionManager().add(cluster->getObserver()); } } }; diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h index 1be8856c13..b15e14d6f6 100644 --- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -33,6 +33,7 @@ namespace qpid { namespace framing { static ProtocolVersion highestProtocolVersion(99, 0); +//static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ } /* namespace qpid */ diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h index b6ac897e96..86b99a83b0 100644 --- a/qpid/cpp/src/qpid/framing/Proxy.h +++ b/qpid/cpp/src/qpid/framing/Proxy.h @@ -39,6 +39,7 @@ class Proxy void send(const AMQBody&); ProtocolVersion getVersion() const; + FrameHandler& getHandler() { return out; } protected: FrameHandler& out; diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index 700aeef47c..715cdaec2a 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { BOOST_CHECK_THROW(session.close(), InternalErrorException); } -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } diff --git a/qpid/cpp/src/tests/serialize.cpp b/qpid/cpp/src/tests/serialize.cpp index 72a92ee226..a120be6458 100644 --- a/qpid/cpp/src/tests/serialize.cpp +++ b/qpid/cpp/src/tests/serialize.cpp @@ -79,7 +79,7 @@ template <class A, class B, class C, class D> struct concat4 { typedef typename typedef mpl::vector<Bit, Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes; typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes; typedef mpl::vector<Double, Float>::type FloatTypes; -typedef mpl::vector<SequenceNo, Uuid, DateTime, Dec32, Dec64> FixedSizeClassTypes; +typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes; typedef mpl::vector<Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes; @@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(testNetworkByteOrder) { void testValue(bool& b) { b = true; } template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; } void testValue(long long& l) { l = 12345; } -void testValue(DateTime& dt) { dt = qpid::sys::now(); } +void testValue(Datetime& dt) { dt = qpid::sys::now(); } void testValue(Uuid& uuid) { uuid=Uuid(true); } template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=1234; } void testValue(SequenceNo& s) { s = 42; } diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index bbdb501b80..d1e3293a3e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -20,7 +20,7 @@ - --> -<amqp major="0" minor="10" port="5672"> +<amqp major="99" minor="0" port="5672"> <class name = "cluster" index = "201"> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 0e36a09bbd..e02b3d6643 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + import javax.jms.Destination; import javax.naming.NamingException; import javax.naming.Reference; @@ -31,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; public abstract class AMQDestination implements Destination, Referenceable @@ -50,6 +51,8 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _routingKey; + private AMQShortString[] _bindingKeys; + private String _url; private AMQShortString _urlAsShortString; @@ -64,7 +67,7 @@ public abstract class AMQDestination implements Destination, Referenceable public static final Integer TOPIC_TYPE = Integer.valueOf(2); public static final Integer UNKNOWN_TYPE = Integer.valueOf(3); - protected AMQDestination(String url) throws URLSyntaxException + protected AMQDestination(String url) throws URISyntaxException { this(new AMQBindingURL(url)); } @@ -79,26 +82,43 @@ public abstract class AMQDestination implements Destination, Referenceable _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey()); + _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) { - this(exchangeName, exchangeClass, routingKey, false, false, queueName); + this(exchangeName, exchangeClass, routingKey, false, false, queueName, null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys) + { + this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) { - this(exchangeName, exchangeClass, destinationName, false, false, null); + this(exchangeName, exchangeClass, destinationName, false, false, null,null); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName) + boolean isAutoDelete, AMQShortString queueName) { - this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false); + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys) + { + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable){ + this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) { // If used with a fannout exchange, the routing key can be null if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null) @@ -120,6 +140,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isAutoDelete = isAutoDelete; _queueName = queueName; _isDurable = isDurable; + _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; } public AMQShortString getEncodedName() @@ -181,6 +202,20 @@ public abstract class AMQDestination implements Destination, Referenceable return _routingKey; } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length > 0) + { + return _bindingKeys; + } + else + { + // catering to the common use case where the + //routingKey is the same as the bindingKey. + return new AMQShortString[]{_routingKey}; + } + } + public boolean isExclusive() { return _isExclusive; @@ -239,6 +274,21 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + // We can't allow both routingKey and bindingKey + if (_routingKey == null && _bindingKeys != null && _bindingKeys.length>0) + { + + for (AMQShortString bindingKey:_bindingKeys) + { + sb.append(BindingURL.OPTION_BINDING_KEY); + sb.append("='"); + sb.append(bindingKey); + sb.append("'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + + } + } + if (_isDurable) { sb.append(BindingURL.OPTION_DURABLE); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 924b20e3ba..78b01add14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -61,8 +61,14 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, - false, queueName, false); } + false, queueName, false); + } + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + false, queueName, false,bindingKeys); + } /** * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. @@ -126,11 +132,15 @@ public class AMQQueue extends AMQDestination implements Queue this(exchangeName, routingKey, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { + this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null); + } + + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys) + { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, - autoDelete, queueName, durable); + autoDelete, queueName, durable, bindingKeys); } public AMQShortString getRoutingKey() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ac5055ccd0..95d9b45b10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; @@ -86,7 +87,6 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,19 +564,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException + final AMQShortString exchangeName,final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments); - - AMQFrame queueBind = body.generateFrame(_channelId); - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); - + sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); return null; } }, _connection).execute(); @@ -587,12 +582,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { if( consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName()); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException; + final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; /** @@ -1036,7 +1031,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQQueue(new AMQBindingURL(queueName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1253,7 +1248,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQTopic(new AMQBindingURL(topicName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1380,6 +1375,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + public void declareAndBind(AMQDestination amqd) + throws + AMQException + { + AMQProtocolHandler protocolHandler = getProtocolHandler(); + declareExchange(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, protocolHandler); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); + } + /** * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. * @@ -1820,35 +1825,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException - { - try - { - AMQMethodEvent response = - new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() - { - public AMQMethodEvent execute() throws AMQException, FailoverException - { - ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName); - AMQFrame boundFrame = body.generateFrame(_channelId); - - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - - } - }, _connection).execute(); - - // Extract and return the response code from the query. - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - - return (responseBody.getReplyCode() == 0); - } - catch (AMQException e) - { - throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); - } - } + public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException; + + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover @@ -2509,16 +2489,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public void declareAndBind(AMQDestination amqd) - throws - AMQException - { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName()); - } - /** * Callers must hold the failover mutex before calling this method. * @@ -2540,7 +2510,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.setQueuename(queueName); // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 407f1f3786..8039e3a163 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import javax.jms.IllegalStateException; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; @@ -159,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - + _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -213,7 +214,7 @@ public class AMQSession_0_10 extends AMQSession * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName) + final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -222,7 +223,12 @@ public class AMQSession_0_10 extends AMQSession { args.put("x-match", "any"); } - getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args); + + for (AMQShortString rk: destination.getBindingKeys()) + { + _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); + getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + } // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -238,6 +244,7 @@ public class AMQSession_0_10 extends AMQSession */ public void sendClose(long timeout) throws AMQException, FailoverException { + getQpidSession().sync(); getQpidSession().sessionClose(); getCurrentException(); } @@ -350,19 +357,37 @@ public class AMQSession_0_10 extends AMQSession /** * Bind a queue with an exchange. */ - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, - final AMQShortString routingKey) throws JMSException + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException + { + return isQueueBound(exchangeName,queueName,routingKey,null); + } + + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); + } + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) + throws JMSException { String rk = ""; boolean res; - if (routingKey != null) + if (bindingKeys != null && bindingKeys.length>0) + { + rk = bindingKeys[0].toString(); + } + else if (routingKey != null) { rk = routingKey.toString(); } + Future<BindingQueryResult> result = - getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null); + getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null); BindingQueryResult bindingQueryResult = result.get(); - if (routingKey == null) + + if (rk == null) { res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } @@ -577,7 +602,7 @@ public class AMQSession_0_10 extends AMQSession { // this is done so that we can produce to a temporary queue beofre we create a consumer sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName()); + sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } catch (Exception e) @@ -701,7 +726,7 @@ public class AMQSession_0_10 extends AMQSession AMQShortString topicName; if (topic instanceof AMQTopic) { - topicName=((AMQTopic) topic).getRoutingKey(); + topicName=((AMQTopic) topic).getBindingKeys()[0]; } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 59129bd28c..0450cffcaf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -94,7 +94,7 @@ public class AMQSession_0_8 extends AMQSession } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). @@ -171,6 +171,11 @@ public class AMQSession_0_8 extends AMQSession } } + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); + } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index e571b5437e..40041afdc6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -49,6 +49,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } + public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys); + } + public AMQTopic(AMQConnection conn, String routingKey) { this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); @@ -77,6 +82,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) + { + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); + } public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 74e466acc4..25494428ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me /** The connection being used by this consumer */ protected final AMQConnection _connection; - private final String _messageSelector; + protected final String _messageSelector; private final boolean _noLocal; @@ -740,6 +740,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } else { + // we should not be allowed to add a message is the + // consumer is closed _synchronousQueue.put(jmsMessage); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ef0f219092..74aac53cda 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -38,7 +38,6 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; import java.util.Iterator; /** @@ -122,8 +121,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (AMQException e) { + _logger.error("Receivecd an Exception when receiving message",e); try { + getSession().getAMQConnection().getExceptionListener() .onException(new JMSAMQException("Error when receiving message", e)); } @@ -135,6 +136,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } if (messageOk) { + _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } } @@ -290,7 +292,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (getMessageSelector() != null && !getMessageSelector().equals("")) + if (_messageSelector != null && !_messageSelector.equals("")) { messageOk = _filter.matches(message); } @@ -332,6 +334,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); + _logger.debug("filterMessage - *************************************"); + _logger.debug("filterMessage - message acquire status : " + messageOk); + _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -393,13 +398,29 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession() .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); + + _logger.debug("acquireMessage, sent acquire message to broker"); + _0_10session.getQpidSession().sync(); + + _logger.debug("acquireMessage, returned from sync"); + RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages(); + + _logger.debug("acquireMessage, acquired range set " + acquired); + if (acquired != null && acquired.size() > 0) { result = true; } + + _logger.debug("acquireMessage, Trying to get current exception "); + _0_10session.getCurrentException(); + + _logger.debug("acquireMessage, returned from getting current exception "); + + _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " ); } return result; } @@ -473,4 +494,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _session.rejectMessage(message, true); } } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 42c1d687cb..c6baf0b0fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,22 +17,22 @@ */ package org.apache.qpid.client; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import java.io.IOException; +import java.net.URISyntaxException; + +import javax.jms.JMSException; +import javax.jms.Message; + import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.nclient.util.ByteBufferMessage; -import org.apache.qpidity.transport.ReplyTo; +import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.transport.DeliveryProperties; - -import javax.jms.Message; -import javax.jms.JMSException; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpidity.transport.ReplyTo; /** * This is a 0_10 message producer. @@ -154,12 +154,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer String replyToURL = contentHeaderProperties.getReplyToAsString(); if (replyToURL != null) { + if(_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("\n=========================="); + b.append("\nReplyTo : " + replyToURL); + b.append("\n=========================="); + _logger.debug(b.toString()); + } AMQBindingURL dest; try { dest = new AMQBindingURL(replyToURL); } - catch (URLSyntaxException e) + catch (URISyntaxException e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } @@ -198,8 +206,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), - destination.getRoutingKey()); + return _session.isQueueBound(destination); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index abca931acc..5baf058668 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,30 +20,34 @@ */ package org.apache.qpid.client.message; -import org.apache.commons.collections.map.ReferenceMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.UUID; -import org.apache.mina.common.ByteBuffer; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import java.util.UUID; -import java.io.IOException; -import java.net.URISyntaxException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 4eeb702703..bf2a2fdd73 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -140,8 +140,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory props.setType(mprop.getType()); props.setUserId(mprop.getUserId()); props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders())); - AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); - message.receivedFromServer(); + AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); return message; } @@ -152,7 +151,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); - + msg.receivedFromServer(); return msg; } @@ -164,7 +163,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory final AbstractJMSMessage msg = create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL); msg.setJMSRedelivered(redelivered); - + msg.receivedFromServer(); return msg; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index b3f46ab0f6..43ac56dee9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,6 +20,24 @@ */ package org.apache.qpid.jndi; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; + import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQHeadersExchange; @@ -30,28 +48,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - public class PropertiesFileInitialContextFactory implements InitialContextFactory { protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); @@ -184,6 +183,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor Topic t = createTopic(entry.getValue().toString()); if (t != null) { + if (_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("Creating the topic: " + jndiName + " with the following binding keys "); + for (AMQShortString binding:((AMQTopic)t).getBindingKeys()) + { + b.append(binding.toString()).append(","); + } + + _logger.debug(b.toString()); + } data.put(jndiName, t); } } @@ -219,7 +229,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } catch (URISyntaxException urlse) { - _logger.warn("Unable to destination:" + urlse); + _logger.warn("Unable to create destination:" + urlse, urlse); return null; } @@ -268,7 +278,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else if (value instanceof String) { - return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value)); + String[] keys = ((String)value).split(","); + AMQShortString[] bindings = new AMQShortString[keys.length]; + int i = 0; + for (String key:keys) + { + bindings[i] = new AMQShortString(key); + i++; + } + // The Destination has a dual nature. If this was used for a producer the key is used + // for the routing key. If it was used for the consumer it becomes the bindingKey + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings); } else if (value instanceof BindingURL) { diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java new file mode 100644 index 0000000000..83d491baad --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpidity.nclient.impl; + +/** + * This class holds all the 0.10 client constants which value can be set + * through properties. + */ +public class Constants +{ + static + { + + String max="message_size_before_sync";// KB's + try + { + MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_SYNC_DATA_LENGH=200000000; + } + String flush="message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH=20000000; + } + } + + /** + * The total message size in KBs that can be transferted before + * client and broker are synchronized. + * A sync will result in the client library releasing the sent messages + * from memory. (messages are kept + * in memory so client can reconnect to a broker in the event of a failure) + * <p> + * Property name: message_size_before_sync + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_SYNC_DATA_LENGH; + /** + * The total message size in KBs that can be transferted before + * messages are flushed. + * When a flush returns all messages have reached the broker. + * <p> + * Property name: message_size_before_flush + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_FLUSH_DATA_LENGH; + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 66be1ebc73..88dd212ab6 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,12 +23,18 @@ package org.apache.qpid.test.unit.client.destinationurl; import junit.framework.TestCase; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.unit.basic.PropertyValueTest; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; public class DestinationURLTest extends TestCase { - public void testFullURL() throws URLSyntaxException + private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class); + + public void testFullURL() throws URISyntaxException { String url = "exchange.Class://exchangeName/Destination/Queue"; @@ -43,7 +49,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("Queue")); } - public void testQueue() throws URLSyntaxException + public void testQueue() throws URISyntaxException { String url = "exchangeClass://exchangeName//Queue"; @@ -58,7 +64,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("Queue")); } - public void testQueueWithOption() throws URLSyntaxException + public void testQueueWithOption() throws URISyntaxException { String url = "exchangeClass://exchangeName//Queue?option='value'"; @@ -75,7 +81,7 @@ public class DestinationURLTest extends TestCase } - public void testDestination() throws URLSyntaxException + public void testDestination() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/"; @@ -90,7 +96,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("")); } - public void testDestinationWithOption() throws URLSyntaxException + public void testDestinationWithOption() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/?option='value'"; @@ -107,7 +113,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("option").equals("value")); } - public void testDestinationWithMultiOption() throws URLSyntaxException + public void testDestinationWithMultiOption() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/?option='value',option2='value2'"; @@ -123,7 +129,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("option2").equals("value2")); } - public void testDestinationWithNoExchangeDefaultsToDirect() throws URLSyntaxException + public void testDestinationWithNoExchangeDefaultsToDirect() throws URISyntaxException { String url = "IBMPerfQueue1?durable='true'"; @@ -138,6 +144,41 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("durable").equals("true")); } + public void testDestinationWithMultiBindingKeys() throws URISyntaxException + { + + String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'"; + + AMQBindingURL dest = new AMQBindingURL(url); + + assertTrue(dest.getExchangeClass().equals("exchangeClass")); + assertTrue(dest.getExchangeName().equals("exchangeName")); + assertTrue(dest.getDestinationName().equals("Destination")); + assertTrue(dest.getQueueName().equals("")); + + assertTrue(dest.getBindingKeys().length == 2); + } + + // You can only specify only a routing key or binding key, but not both. + public void testDestinationIfOnlyRoutingKeyOrBindingKeyIsSpecified() throws URISyntaxException + { + + String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',routingkey='key2'"; + boolean exceptionThrown = false; + try + { + + AMQBindingURL dest = new AMQBindingURL(url); + } + catch(URISyntaxException e) + { + exceptionThrown = true; + _logger.info("Exception thrown",e); + } + + assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DestinationURLTest.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 529a05b2e2..998242925c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.url; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; - public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); String _url; - AMQShortString _exchangeClass; - AMQShortString _exchangeName; - AMQShortString _destinationName; - AMQShortString _queueName; + AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + AMQShortString _exchangeName = new AMQShortString(""); + AMQShortString _destinationName = new AMQShortString("");; + AMQShortString _queueName = new AMQShortString(""); + AMQShortString[] _bindingKeys = new AMQShortString[0]; private HashMap<String, String> _options; - public AMQBindingURL(String url) throws URLSyntaxException + public AMQBindingURL(String url) throws URISyntaxException { // format: // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* @@ -52,116 +53,35 @@ public class AMQBindingURL implements BindingURL parseBindingURL(); } - private void parseBindingURL() throws URLSyntaxException + private void parseBindingURL() throws URISyntaxException { - try - { - URI connection = new URI(_url); - - String exchangeClass = connection.getScheme(); - - if (exchangeClass == null) - { - _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url; - // URLHelper.parseError(-1, "Exchange Class not specified.", _url); - parseBindingURL(); - - return; - } - else - { - setExchangeClass(exchangeClass); - } - - String exchangeName = connection.getHost(); - - if (exchangeName == null) - { - if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) - { - setExchangeName(""); - } - else - { - throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); - } - } - else - { - setExchangeName(exchangeName); - } - - String queueName; - - if ((connection.getPath() == null) || connection.getPath().equals("")) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination or Queue requried", _url); - } - else - { - int slash = connection.getPath().indexOf("/", 1); - if (slash == -1) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination requried", _url); - } - else - { - String path = connection.getPath(); - setDestinationName(path.substring(1, slash)); - - // We don't set queueName yet as the actual value we use depends on options set - // when we are dealing with durable subscriptions - - queueName = path.substring(slash + 1); - - } - } - - URLHelper.parseOptions(_options, connection.getQuery()); - - processOptions(); - - // We can now call setQueueName as the URL is full parsed. - - setQueueName(queueName); - - // Fragment is #string (not used) - _logger.debug("URL Parsed: " + this); - - } - catch (URISyntaxException uris) - { - - throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); - - } + BindingURLParser parser = new BindingURLParser(_url,this); + processOptions(); + _logger.debug("URL Parsed: " + this); } - private void setExchangeClass(String exchangeClass) + public void setExchangeClass(String exchangeClass) { setExchangeClass(new AMQShortString(exchangeClass)); } - private void setQueueName(String name) throws URLSyntaxException + public void setQueueName(String name) { setQueueName(new AMQShortString(name)); } - private void setDestinationName(String name) + public void setDestinationName(String name) { setDestinationName(new AMQShortString(name)); } - private void setExchangeName(String exchangeName) + public void setExchangeName(String exchangeName) { setExchangeName(new AMQShortString(exchangeName)); } - private void processOptions() + private void processOptions() throws URISyntaxException { - // this is where we would parse any options that needed more than just storage. } public String getURL() @@ -210,34 +130,9 @@ public class AMQBindingURL implements BindingURL return _queueName; } - public void setQueueName(AMQShortString name) throws URLSyntaxException + public void setQueueName(AMQShortString name) { - if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) - { - if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) - { - if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) - { - _queueName = - new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); - } - else - { - throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID - + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url); - - } - } - else - { - _queueName = null; - } - } - else - { - _queueName = name; - } - + _queueName = name; } public String getOption(String key) @@ -261,7 +156,7 @@ public class AMQBindingURL implements BindingURL { if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } else { @@ -271,12 +166,29 @@ public class AMQBindingURL implements BindingURL if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } return getDestinationName(); } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length>0) + { + return _bindingKeys; + } + else + { + return new AMQShortString[]{getRoutingKey()}; + } + } + + public void setBindingKeys(AMQShortString[] keys) + { + _bindingKeys = keys; + } + public void setRoutingKey(AMQShortString key) { setOption(OPTION_ROUTING_KEY, key.toString()); @@ -296,6 +208,29 @@ public class AMQBindingURL implements BindingURL sb.append(URLHelper.printOptions(_options)); - return sb.toString(); + // temp hack + if (getRoutingKey() == null || getRoutingKey().toString().equals("")) + { + + if (sb.toString().indexOf("?") == -1) + { + sb.append("?"); + } + else + { + sb.append("&"); + } + + for (AMQShortString key :_bindingKeys) + { + sb.append(BindingURL.OPTION_BINDING_KEY).append("='").append(key.toString()).append("'&"); + } + + return sb.toString().substring(0,sb.toString().length()-1); + } + else + { + return sb.toString(); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 67be2db86f..25450fea64 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,6 +34,7 @@ public interface BindingURL public static final String OPTION_CLIENTID = "clientid"; public static final String OPTION_SUBSCRIPTION = "subscription"; public static final String OPTION_ROUTING_KEY = "routingkey"; + public static final String OPTION_BINDING_KEY = "bindingkey"; String getURL(); @@ -52,5 +53,7 @@ public interface BindingURL AMQShortString getRoutingKey(); + AMQShortString[] getBindingKeys(); + String toString(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java new file mode 100644 index 0000000000..5d26e7e65b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -0,0 +1,445 @@ +package org.apache.qpid.url; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BindingURLParser +{ + private static final char PROPERTY_EQUALS_CHAR = '='; + private static final char PROPERTY_SEPARATOR_CHAR = '&'; + private static final char ALTERNATIVE_PROPERTY_SEPARATOR_CHAR = ','; + private static final char FORWARD_SLASH_CHAR = '/'; + private static final char QUESTION_MARK_CHAR = '?'; + private static final char SINGLE_QUOTE_CHAR = '\''; + private static final char COLON_CHAR = ':'; + private static final char END_OF_URL_MARKER_CHAR = '%'; + + private static final Logger _logger = LoggerFactory.getLogger(BindingURLImpl.class); + + private char[] _url; + private AMQBindingURL _bindingURL; + private BindingURLParserState _currentParserState; + private String _error; + private int _index = 0; + private String _currentPropName; + private Map<String,Object> _options = new HashMap<String,Object>(); + + //<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + public BindingURLParser(String url,AMQBindingURL bindingURL) throws URISyntaxException + { + _url = (url + END_OF_URL_MARKER_CHAR).toCharArray(); + _bindingURL = bindingURL; + _currentParserState = BindingURLParserState.BINDING_URL_START; + BindingURLParserState prevState = _currentParserState; + + try + { + while (_currentParserState != BindingURLParserState.ERROR && _currentParserState != BindingURLParserState.BINDING_URL_END) + { + prevState = _currentParserState; + _currentParserState = next(); + } + + if (_currentParserState == BindingURLParserState.ERROR) + { + _error = + "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ") due to " + _error; + _logger.debug(_error); + URISyntaxException ex; + ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + throw ex; + } + + processOptions(); + } + catch (ArrayIndexOutOfBoundsException e) + { + _error = "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ")"; + URISyntaxException ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + ex.initCause(e); + throw ex; + } + } + + enum BindingURLParserState + { + BINDING_URL_START, + EXCHANGE_CLASS, + COLON_CHAR, + DOUBLE_SEP, + EXCHANGE_NAME, + EXCHANGE_SEPERATOR_CHAR, + DESTINATION, + DESTINATION_SEPERATOR_CHAR, + QUEUE_NAME, + QUESTION_MARK_CHAR, + PROPERTY_NAME, + PROPERTY_EQUALS, + START_PROPERTY_VALUE, + PROPERTY_VALUE, + END_PROPERTY_VALUE, + PROPERTY_SEPARATOR, + BINDING_URL_END, + ERROR + } + + /** + * I am fully ware that there are few optimizations + * that can speed up things a wee bit. But I have opted + * for readability and maintainability at the expense of + * speed, as speed is not a critical factor here. + * + * One can understand the full parse logic by just looking at this method. + */ + private BindingURLParserState next() + { + switch (_currentParserState) + { + case BINDING_URL_START: + return extractExchangeClass(); + case COLON_CHAR: + _index++; //skip ":" + return BindingURLParserState.DOUBLE_SEP; + case DOUBLE_SEP: + _index = _index + 2; //skip "//" + return BindingURLParserState.EXCHANGE_NAME; + case EXCHANGE_NAME: + return extractExchangeName(); + case EXCHANGE_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.DESTINATION; + case DESTINATION: + return extractDestination(); + case DESTINATION_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.QUEUE_NAME; + case QUEUE_NAME: + return extractQueueName(); + case QUESTION_MARK_CHAR: + _index++; // skip '?' + return BindingURLParserState.PROPERTY_NAME; + case PROPERTY_NAME: + return extractPropertyName(); + case PROPERTY_EQUALS: + _index++; // skip the equal sign + return BindingURLParserState.START_PROPERTY_VALUE; + case START_PROPERTY_VALUE: + _index++; // skip the '\'' + return BindingURLParserState.PROPERTY_VALUE; + case PROPERTY_VALUE: + return extractPropertyValue(); + case END_PROPERTY_VALUE: + _index ++; + return checkEndOfURL(); + case PROPERTY_SEPARATOR: + _index++; // skip '&' + return BindingURLParserState.PROPERTY_NAME; + default: + return BindingURLParserState.ERROR; + } + } + + private BindingURLParserState extractExchangeClass() + { + char nextChar = _url[_index]; + + // check for the following special cases. + // "myQueue?durable='true'" or just "myQueue"; + + StringBuilder builder = new StringBuilder(); + while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // normal use case + if (nextChar == COLON_CHAR) + { + _bindingURL.setExchangeClass(builder.toString()); + return BindingURLParserState.COLON_CHAR; + } + // "myQueue?durable='true'" use case + else if (nextChar == QUESTION_MARK_CHAR) + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractExchangeName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + _bindingURL.setExchangeName(builder.toString()); + return BindingURLParserState.EXCHANGE_SEPERATOR_CHAR; + } + + private BindingURLParserState extractDestination() + { + char nextChar = _url[_index]; + + //The destination is and queue name are both optional + // This is checking for the case where both are not specified. + if (nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR && nextChar != QUESTION_MARK_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // This is the case where the destination is explictily stated. + // ex direct://amq.direct/myDest/myQueue?option1='1' ... OR + // direct://amq.direct//myQueue?option1='1' ... + if (nextChar == FORWARD_SLASH_CHAR) + { + _bindingURL.setDestinationName(builder.toString()); + return BindingURLParserState.DESTINATION_SEPERATOR_CHAR; + } + // This is the case where destination is not explictly stated. + // ex direct://amq.direct/myQueue?option1='1' ... + else + { + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + } + + private BindingURLParserState extractQueueName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + nextChar = _url[++_index]; + } + _bindingURL.setQueueName(builder.toString()); + + if(nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractPropertyName() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != PROPERTY_EQUALS_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + _currentPropName = builder.toString(); + + if (_currentPropName.trim().equals("")) + { + _error = "Property name cannot be empty"; + return BindingURLParserState.ERROR; + } + + return BindingURLParserState.PROPERTY_EQUALS; + } + + private BindingURLParserState extractPropertyValue() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != SINGLE_QUOTE_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + String propValue = builder.toString(); + + if (propValue.trim().equals("")) + { + _error = "Property values cannot be empty"; + return BindingURLParserState.ERROR; + } + else + { + if (_options.containsKey(_currentPropName)) + { + Object obj = _options.get(_currentPropName); + if (obj instanceof List) + { + List list = (List)obj; + list.add(propValue); + } + else // it has to be a string + { + List<String> list = new ArrayList(); + list.add((String)obj); + list.add(propValue); + _options.put(_currentPropName, list); + } + } + else + { + _options.put(_currentPropName, propValue); + } + + + return BindingURLParserState.END_PROPERTY_VALUE; + } + } + + private BindingURLParserState checkEndOfURL() + { + char nextChar = _url[_index]; + if ( nextChar == END_OF_URL_MARKER_CHAR) + { + return BindingURLParserState.BINDING_URL_END; + } + else if (nextChar == PROPERTY_SEPARATOR_CHAR || nextChar == ALTERNATIVE_PROPERTY_SEPARATOR_CHAR) + { + return BindingURLParserState.PROPERTY_SEPARATOR; + } + else + { + return BindingURLParserState.ERROR; + } + } + + private String markErrorLocation() + { + String tmp = String.valueOf(_url); + // length -1 to remove ENDOF URL marker + return tmp.substring(0,_index) + "^" + tmp.substring(_index+1> tmp.length()-1?tmp.length()-1:_index+1,tmp.length()-1); + } + + private void processOptions() throws URISyntaxException + { +// check for bindingKey + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.get(BindingURL.OPTION_BINDING_KEY) != null) + { + Object obj = _options.get(BindingURL.OPTION_BINDING_KEY); + + if (obj instanceof String) + { + AMQShortString[] bindingKeys = new AMQShortString[]{new AMQShortString((String)obj)}; + _bindingURL.setBindingKeys(bindingKeys); + } + else // it would be a list + { + List list = (List)obj; + AMQShortString[] bindingKeys = new AMQShortString[list.size()]; + int i=0; + for (Iterator it = list.iterator(); it.hasNext();) + { + bindingKeys[i] = new AMQShortString((String)it.next()); + i++; + } + _bindingURL.setBindingKeys(bindingKeys); + } + + } + for (String key: _options.keySet()) + { + // We want to skip the bindingKey list + if (_options.get(key) instanceof String) + { + _bindingURL.setOption(key, (String)_options.get(key)); + } + } + + + // check if both a binding key and a routing key is specified. + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.containsKey(BindingURL.OPTION_ROUTING_KEY)) + { + throw new URISyntaxException(String.valueOf(_url),"It is illegal to specify both a routingKey and a bindingKey in the same URL",-1); + } + + // check for durable subscriptions + if (_bindingURL.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + String queueName = null; + if (Boolean.parseBoolean(_bindingURL.getOption(BindingURL.OPTION_DURABLE))) + { + if (_bindingURL.containsOption(BindingURL.OPTION_CLIENTID) && _bindingURL.containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + queueName = _bindingURL.getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + throw new URISyntaxException(String.valueOf(_url),"Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + + " and " + BindingURL.OPTION_SUBSCRIPTION , -1); + + } + } + _bindingURL.setQueueName(queueName); + } + } + + public static void main(String[] args) + { + String[] urls = new String[] + { + "topic://amq.topic//myTopic?routingkey='stocks.#'", + "topic://amq.topic/message_queue?bindingkey='usa.*'&bindingkey='control',exclusive='true'", + "topic://amq.topic//?bindingKey='usa.*',bindingkey='control',exclusive='true'", + "direct://amq.direct/dummyDest/myQueue?routingkey='abc.*'", + "exchange.Class://exchangeName/Destination/Queue", + "exchangeClass://exchangeName/Destination/?option='value',option2='value2'", + "IBMPerfQueue1?durable='true'", + "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'", + "exchangeClass://exchangeName/Destination/?bindingkey='key1'&routingkey='key2'" + }; + + try + { + for (String url: urls) + { + System.out.println("URL " + url); + AMQBindingURL bindingURL = new AMQBindingURL(url); + BindingURLParser parser = new BindingURLParser(url,bindingURL); + System.out.println("\nX " + bindingURL.toString() + " \n"); + + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 1da06ab92b..b5f2efdf01 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -158,6 +158,7 @@ </target> <property name="test" value="*Test"/> + <property name="test.mem" value="512M"/> <property name="log" value="info"/> <property name="amqj.logging.level" value="${log}"/> @@ -190,6 +191,8 @@ description="execute unit tests"> <junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000"> + <jvmarg value="-Xmx${test.mem}" /> + <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/> <sysproperty key="root.logging.level" value="${root.logging.level}"/> <sysproperty key="log4j.configuration" value="${log4j.configuration}"/> diff --git a/qpid/python/mllib/dom.py b/qpid/python/mllib/dom.py index 10b19d6db1..7c759dbdd5 100644 --- a/qpid/python/mllib/dom.py +++ b/qpid/python/mllib/dom.py @@ -72,7 +72,7 @@ class Dispatcher: cls = cls.base return False - def dispatch(self, f): + def dispatch(self, f, attrs = ""): cls = self while cls != None: if hasattr(f, cls.type): @@ -81,7 +81,6 @@ class Dispatcher: cls = cls.base cls = self - attrs = "" while cls != None: if attrs: sep = ", " @@ -151,9 +150,10 @@ class Tag(Node): def dispatch(self, f): try: - method = getattr(f, "do_" + self.name) + attr = "do_" + self.name + method = getattr(f, attr) except AttributeError: - return Dispatcher.dispatch(self, f) + return Dispatcher.dispatch(self, f, "'%s'" % attr) return method(self) class Leaf(Component, Dispatcher): diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py index af0565b6cc..00946a9156 100644 --- a/qpid/python/qpid/queue.py +++ b/qpid/python/qpid/queue.py @@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use. """ from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread class Closed(Exception): pass class Queue(BaseQueue): END = object() + STOP = object() def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) - self._real_put = self.put - self.listener = self._real_put + self.listener = None + self.thread = None def close(self): self.put(Queue.END) def get(self, block = True, timeout = None): - self.put = self._real_put - try: - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result - finally: - self.put = self.listener - pass + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result def listen(self, listener): self.listener = listener - self.put = self.listener + if listener == None: + if self.thread != None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + else: + if self.thread == None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed: + break diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py index d2e495d207..e12354eb43 100644 --- a/qpid/python/tests/queue.py +++ b/qpid/python/tests/queue.py @@ -30,37 +30,32 @@ class QueueTest (TestCase): # all the queue functionality. def test_listen(self): - LISTEN = object() - GET = object() - EMPTY = object() + values = [] + heard = threading.Event() + def listener(x): + values.append(x) + heard.set() q = Queue(0) - values = [] - q.listen(lambda x: values.append((LISTEN, x))) + q.listen(listener) + heard.clear() q.put(1) - assert values[-1] == (LISTEN, 1) + heard.wait() + assert values[-1] == 1 + heard.clear() q.put(2) - assert values[-1] == (LISTEN, 2) - - class Getter(threading.Thread): + heard.wait() + assert values[-1] == 2 - def run(self): - try: - values.append((GET, q.get(timeout=10))) - except Empty: - values.append(EMPTY) - - g = Getter() - g.start() - # let the other thread reach the get - time.sleep(2) + q.listen(None) q.put(3) - g.join() - - assert values[-1] == (GET, 3) + assert q.get(3) == 3 + q.listen(listener) + heard.clear() q.put(4) - assert values[-1] == (LISTEN, 4) + heard.wait() + assert values[-1] == 4 def test_close(self): q = Queue(0) |