diff options
76 files changed, 2345 insertions, 548 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 3d68ed7890..c17051d431 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -122,11 +122,11 @@ test -n "$RUBY" && generate=yes test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").]) specdir=`pwd`/$srcdir/../specs -AMQP_XML=$specdir/amqp.0-10-preview.xml -AC_SUBST(AMQP_XML) -ls $AMQP_XML >/dev/null 2>&1 || generate=no - -AM_CONDITIONAL([GENERATE], [test x$generate = xyes]) +AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml +AMQP_FINAL_XML=$specdir/amqp.0-10.xml +AC_SUBST(AMQP_PREVIEW_XML) +AC_SUBST(AMQP_FINAL_XML) +AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null]) # URL and download URL for the package. URL=http://rhm.et.redhat.com/qpidc @@ -139,7 +139,6 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],, AC_MSG_ERROR([Missing required header files.])) # Check for optional CPG requirement. -save_ldflags=$LDFLAGS LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" AC_ARG_WITH([cpg], @@ -157,15 +156,13 @@ AC_ARG_WITH([cpg], esac], [ # not specified - enable if libs/headers available. with_CPG=yes - AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no]) + AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) ] ) AM_CONDITIONAL([CPG], [test x$with_CPG = xyes]) if test x$with_CPG = xyes; then CPPFLAGS+=" -DCPG" -else - LDFLAGS=$save_ldflags fi # Files to generate diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 167247f67b..fd4a24b2e5 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -104,6 +104,7 @@ make check %files devel %defattr(-,root,root,-) %_includedir/qpid/*.h +%_includedir/qpid/amqp_0_10 %_includedir/qpid/client %_includedir/qpid/framing %_includedir/qpid/sys diff --git a/qpid/cpp/rubygen/0-10/specification.rb b/qpid/cpp/rubygen/0-10/specification.rb new file mode 100755 index 0000000000..026b49e1a9 --- /dev/null +++ b/qpid/cpp/rubygen/0-10/specification.rb @@ -0,0 +1,177 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class Specification < CppGen + def initialize(outdir, amqp) + super(outdir, amqp) + @ns="qpid::amqp_#{@amqp.version.bars}" + @dir="qpid/amqp_#{@amqp.version.bars}" + end + + # domains + + def domain_h(d) + genl + typename=d.name.typename + if d.enum + scope("enum #{typename} {", "};") { + genl d.enum.choices.map { |c| + "#{c.name.constname} = #{c.value}" }.join(",\n") + } + elsif (d.type_ == "array") + genl "typedef Array<#{ArrayTypes[d.name].amqp2cpp}> #{typename};" + else + genl "typedef #{d.type_.amqp2cpp} #{typename};" + end + end + + # class constants + + def class_h(c) + genl "const uint8_t CODE=#{c.code};" + genl "extern const char* NAME;" + end + + def class_cpp(c) + genl "const char* NAME=\"#{c.fqname}\";" + end + + # Used by structs, commands and controls. + def action_struct_h(x, base, consts, &block) + genl + struct(x.classname, "public #{base}") { + x.fields.each { |f| genl "#{f.type_.amqp2cpp} #{f.cppname};" } + genl + genl "static const char* NAME;" + consts.each { |c| genl "static const uint8_t #{c.upcase}=#{x.send c or 0};"} + genl "static const uint8_t CLASS_CODE=#{x.containing_class.nsname}::CODE;" + genl + genl "#{x.classname}();" + scope("#{x.classname}(",");") { genl x.parameters } unless x.fields.empty? + genl + genl "void accept(Visitor&) const;" + genl + yield if block + } + end + + def action_struct_cpp(x) + genl + genl "const char* #{x.classname}::NAME=\"#{x.fqname}\";" + genl + genl "#{x.classname}::#{x.classname}() {}"; + genl + if not x.fields.empty? + scope("#{x.classname}::#{x.classname}(",") :") { genl x.parameters } + indent() { genl x.initializers } + genl "{}" + genl + end + scope("void #{x.classname}::accept(Visitor&) const {","}") { + genl "// FIXME aconway 2008-02-27: todo" + } + end + + # structs + + def struct_h(s) action_struct_h(s, "Struct", ["size","pack","code"]); end + def struct_cpp(s) action_struct_cpp(s) end + + # command and control + + def action_h(a) + action_struct_h(a, a.base, ["code"]) { + scope("template <class T> void invoke(T& target) {","}") { + scope("target.#{a.funcname}(", ");") { genl a.values } + } + genl + scope("template <class S> void serialize(S& s) {","}") { + gen "s" + a.fields.each { |f| gen "(#{f.cppname})"} + genl ";" + } unless a.fields.empty? + } + end + + def action_cpp(a) action_struct_cpp(a); end + + # Types that must be generated early because they are used by other types. + def pregenerate?(x) not @amqp.used_by[x.fqname].empty?; end + + # Generate the log + def gen_specification() + h_file("#{@dir}/specification") { + include "#{@dir}/built_in_types" + include "#{@dir}/helpers" + include "<boost/call_traits.hpp>" + genl "using boost::call_traits;" + namespace(@ns) { + # Top level + @amqp.domains.each { |d| + # segment-type and track are are built in + domain_h d unless ["track","segment-type"].include?(d.name) + } + puts @amqp.used_by.inspect + + # Domains and structs that must be generated early because + # they are used by other definitions: + each_class_ns { |c| + class_h c + c.domains.each { |d| domain_h d if pregenerate? d } + c.structs.each { |s| struct_h s if pregenerate? s } + } + # Now dependent domains/structs and actions + each_class_ns { |c| + c.domains.each { |d| domain_h d if not pregenerate? d } + c.structs.each { |s| struct_h s if not pregenerate? s } + c.actions.each { |a| action_h a } + } + } + } + + cpp_file("#{@dir}/specification") { + include "#{@dir}/specification" + namespace(@ns) { + each_class_ns { |c| + class_cpp c + c.actions.each { |a| action_cpp a} + c.structs.each { |s| struct_cpp s } + } + } + } + end + + def gen_proxy() + h_file("#{@dir}/Proxy.h") { + include "#{@dir}/specification" + namespace(@ns) { + genl "template <class F, class R=F::result_type>" + cpp_class("ProxyTemplate") { + public + genl "ProxyTemplate(F f) : functor(f) {}" + @amqp.classes.each { |c| + c.actions.each { |a| + scope("R #{a.funcname}(", ")") { genl a.parameters } + scope() { + var=a.name.funcname + scope("#{a.classname} #{var}(",");") { genl a.arguments } + genl "return functor(#{var});" + } + } + } + private + genl "F functor;" + } + } + } + end + + def generate + gen_specification + gen_proxy + end +end + +Specification.new($outdir, $amqp).generate(); + diff --git a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb index 18f74a2e41..f9ef95f5a0 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb @@ -23,5 +23,5 @@ class MethodBodyConstVisitorGen < CppGen end end -MethodBodyConstVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyConstVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb index 4944b10a84..a74b0c06d6 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb @@ -31,5 +31,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodHolder.rb b/qpid/cpp/rubygen/99-0/MethodHolder.rb index 7e915677b2..a708db6676 100755 --- a/qpid/cpp/rubygen/templates/MethodHolder.rb +++ b/qpid/cpp/rubygen/99-0/MethodHolder.rb @@ -96,5 +96,5 @@ EOS end end -MethodHolderGen.new(Outdir, Amqp).generate(); +MethodHolderGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/99-0/Operations.rb index 91007ef3e1..c985bb6105 100755 --- a/qpid/cpp/rubygen/templates/Operations.rb +++ b/qpid/cpp/rubygen/99-0/Operations.rb @@ -91,6 +91,6 @@ EOS end end -OperationsGen.new("client",ARGV[0], Amqp).generate() -OperationsGen.new("server",ARGV[0], Amqp).generate() +OperationsGen.new("client",ARGV[0], $amqp).generate() +OperationsGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/OperationsInvoker.rb b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb index 747dd06189..642f98ce8e 100755 --- a/qpid/cpp/rubygen/templates/OperationsInvoker.rb +++ b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb @@ -88,5 +88,5 @@ class OperationsInvokerGen < CppGen end end -OperationsInvokerGen.new("client",ARGV[0], Amqp).generate() -OperationsInvokerGen.new("server",ARGV[0], Amqp).generate() +OperationsInvokerGen.new("client",ARGV[0], $amqp).generate() +OperationsInvokerGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb index 467476506c..2829884673 100755 --- a/qpid/cpp/rubygen/templates/Proxy.rb +++ b/qpid/cpp/rubygen/99-0/Proxy.rb @@ -62,7 +62,9 @@ EOS include "<sstream>" include "#{@classname}.h" include "qpid/framing/amqp_types_full.h" - Amqp.methods_on(@chassis).each { |m| include "qpid/framing/"+m.body_name } + @amqp.methods_on(@chassis).each { + |m| include "qpid/framing/"+m.body_name + } genl namespace("qpid::framing") { genl "#{@classname}::#{@classname}(FrameHandler& f) :" @@ -75,6 +77,6 @@ EOS end -ProxyGen.new("client", Outdir, Amqp).generate; -ProxyGen.new("server", Outdir, Amqp).generate; +ProxyGen.new("client", $outdir, $amqp).generate; +ProxyGen.new("server", $outdir, $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb index 6a50fdb462..e01a28a62d 100644 --- a/qpid/cpp/rubygen/templates/Session.rb +++ b/qpid/cpp/rubygen/99-0/Session.rb @@ -190,6 +190,6 @@ EOS end end -SessionNoKeywordGen.new(ARGV[0], Amqp).generate() -SessionGen.new(ARGV[0], Amqp).generate() +SessionNoKeywordGen.new(ARGV[0], $amqp).generate() +SessionGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/all_method_bodies.rb b/qpid/cpp/rubygen/99-0/all_method_bodies.rb index d06f459493..5971d49189 100755 --- a/qpid/cpp/rubygen/templates/all_method_bodies.rb +++ b/qpid/cpp/rubygen/99-0/all_method_bodies.rb @@ -17,5 +17,5 @@ class AllMethodBodiesGen < CppGen end end -AllMethodBodiesGen.new(Outdir, Amqp).generate(); +AllMethodBodiesGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/constants.rb b/qpid/cpp/rubygen/99-0/constants.rb index 5fbbefe218..b5f559d504 100755 --- a/qpid/cpp/rubygen/templates/constants.rb +++ b/qpid/cpp/rubygen/99-0/constants.rb @@ -78,5 +78,5 @@ class ConstantsGen < CppGen end end -ConstantsGen.new(Outdir, Amqp).generate(); +ConstantsGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/frame_body_lists.rb b/qpid/cpp/rubygen/99-0/frame_body_lists.rb index 634001ab14..b20e4550f3 100644 --- a/qpid/cpp/rubygen/templates/frame_body_lists.rb +++ b/qpid/cpp/rubygen/99-0/frame_body_lists.rb @@ -26,6 +26,6 @@ EOS end end -FrameBodyListsGen.new(ARGV[0], Amqp).generate; +FrameBodyListsGen.new(ARGV[0], $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb index edbadb01f6..336591be00 100644 --- a/qpid/cpp/rubygen/templates/structs.rb +++ b/qpid/cpp/rubygen/99-0/structs.rb @@ -534,5 +534,5 @@ EOS end end -StructGen.new(ARGV[0], Amqp).generate() +StructGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb index d1212153ca..1fff1d51db 100755 --- a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb @@ -30,5 +30,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index f998909ec8..b1e635a27b 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -119,7 +119,16 @@ class AmqpElement # List of children of type elname, or all children if elname # not specified. def children(elname=nil) - @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + if elname + @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + else + @children + end + end + + def each_descendant(&block) + yield self + @children.each { |c| c.each_descendant(&block) } end # Look up child of type elname with attribute name. @@ -127,6 +136,9 @@ class AmqpElement @cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name } end + # Fully qualified amqp dotted name of this element + def dotted_name() (parent ? parent.dotted_name+"." : "") + name; end + # The root <amqp> element. def root() @root ||=parent ? parent.root : self; end @@ -140,9 +152,22 @@ class AmqpElement # Text of doc child if there is one. def doc() d=xml.elements["doc"]; d and d.text; end + def fqname() + throw "fqname: #{self} #{parent.fqname} has no name" unless name + p=parent && parent.fqname + p ? p+"."+name : name; + end + + def containing_class() + return self if is_a? AmqpClass + return parent && parent.containing_class + end + end -AmqpResponse = AmqpElement +class AmqpResponse < AmqpElement + def initialize(xml, parent) super; end +end class AmqpDoc < AmqpElement def initialize(xml,parent) super; end @@ -153,18 +178,32 @@ class AmqpChoice < AmqpElement def initialize(xml,parent) super; end amqp_attr_reader :name, :value end - + class AmqpEnum < AmqpElement def initialize(xml,parent) super; end amqp_child_reader :choice end +# 0-10 array domains are missing element type information, add it here. +ArrayTypes={ + "str16-array" => "str-16", + "amqp-host-array" => "connection.amqp-host-url", + "command-fragments" => "session.command-fragment", + "in-doubt" => "dtx.xid" +} + class AmqpDomain < AmqpElement - def initialize(xml, parent) super; end + def initialize(xml, parent) + super + root.used_by[uses].push(fqname) if uses and uses.index('.') + end + amqp_attr_reader :type amqp_single_child_reader :struct # preview amqp_single_child_reader :enum + def uses() type_=="array" ? ArrayTypes[name] : type_; end + def unalias() d=self while (d.type_ != d.name and root.domain(d.type_)) @@ -180,10 +219,13 @@ class AmqpException < AmqpElement end class AmqpField < AmqpElement - def initialize(xml, amqp) super; end; + def initialize(xml, amqp) + super; + root.used_by[type_].push(parent.fqname) if type_ and type_.index('.') + end + def domain() root.domain(xml.attributes["domain"]); end amqp_single_child_reader :struct # preview - # FIXME aconway 2008-02-21: exceptions in fields - need to update c++ mapping. amqp_child_reader :exception amqp_attr_reader :type, :default, :code, :required end @@ -198,15 +240,11 @@ class AmqpConstant < AmqpElement amqp_attr_reader :value, :class end -# FIXME aconway 2008-02-21: -# class AmqpResponse < AmqpElement -# def initialize(xml, parent) super; end -# end - class AmqpResult < AmqpElement def initialize(xml, parent) super; end amqp_single_child_reader :struct # preview amqp_attr_reader :type + def name() "result"; end end class AmqpEntry < AmqpElement @@ -236,8 +274,8 @@ class AmqpStruct < AmqpElement amqp_attr_reader :size, :code, :pack amqp_child_reader :field - alias :raw_pack :pack # preview - preview code needs default "short" for pack. + alias :raw_pack :pack def pack() raw_pack or (not parent.final? and "short"); end def result?() parent.xml.name == "result"; end def domain?() parent.xml.name == "domain"; end @@ -265,17 +303,21 @@ class AmqpRole < AmqpElement amqp_attr_reader :implement end -class AmqpControl < AmqpElement +# Base class for command and control. +class AmqpAction < AmqpElement def initialize(xml,amqp) super; end amqp_child_reader :implement, :field, :response amqp_attr_reader :code end -class AmqpCommand < AmqpElement +class AmqpControl < AmqpAction + def initialize(xml,amqp) super; end +end + +class AmqpCommand < AmqpAction def initialize(xml,amqp) super; end - amqp_child_reader :implement, :field, :exception, :response + amqp_child_reader :exception amqp_single_child_reader :result, :segments - amqp_attr_reader :code end class AmqpClass < AmqpElement @@ -296,12 +338,19 @@ class AmqpClass < AmqpElement def l4?() # preview !["connection", "session", "execution"].include?(name) end + + def actions() controls+commands; end end class AmqpType < AmqpElement + def initialize(xml,amqp) super; end amqp_attr_reader :code, :fixed_width, :variable_width end +class AmqpXref < AmqpElement + def initialize(xml,amqp) super; end +end + # AMQP root element. class AmqpRoot < AmqpElement amqp_attr_reader :major, :minor, :port, :comment @@ -314,14 +363,16 @@ class AmqpRoot < AmqpElement raise "No XML spec files." if specs.empty? xml=parse(specs.shift) specs.each { |s| xml_merge(xml, parse(s)) } + @used_by=Hash.new{ |h,k| h[k]=[] } super(xml, nil) end + attr_reader :used_by + + def merge(root) xml_merge(xml, root.xml); end + def version() major + "-" + minor; end - # Find a child node from a dotted amqp name, e.g. message.transfer - def lookup(dotted_name) elements[dotted_name.gsub(/\./,"/")]; end - # preview - only struct child reader remains for new mapping def domain_structs() domains.map{ |d| d.struct }.compact; end def result_structs() @@ -337,6 +388,8 @@ class AmqpRoot < AmqpElement @methods_on[chassis] ||= classes.map { |c| c.methods_on(chassis) }.flatten end + def fqname() nil; end + # TODO aconway 2008-02-21: methods by role. private @@ -353,7 +406,7 @@ end # Collect information about generated files. class GenFiles @@files =[] - def GenFiles.add(f) @@files << f; puts f; end + def GenFiles.add(f) @@files << f; end def GenFiles.get() @@files; end end @@ -366,26 +419,38 @@ class Generator def initialize (outdir, amqp) @amqp=amqp @outdir=outdir - @prefix='' # For indentation or comments. + @prefix=[''] # For indentation or comments. @indentstr=' ' # One indent level. @outdent=2 - Pathname.new(@outdir).mkpath unless @outdir=="-" or File.directory?(@outdir) + Pathname.new(@outdir).mkpath unless @outdir=="-" end # Create a new file, set @out. - def file(file) + def file(file, &block) GenFiles.add file if (@outdir != "-") - path=Pathname.new "#{@outdir}/#{file}" - path.parent.mkpath - path.open('w') { |@out| yield } + @path=Pathname.new "#{@outdir}/#{file}" + @path.parent.mkpath + @out=String.new # Generate in memory first + if block then yield; endfile; end + end + end + + def endfile() + if @outdir != "-" + if @path.exist? and @path.read == @out + puts "Skipped #{@path} - unchanged" # Dont generate if unchanged + else + @path.open('w') { |f| f << @out } + puts "Generated #{@path}" + end end end # Append multi-line string to generated code, prefixing each line. def gen (str) str.each_line { |line| - @out << @prefix unless @midline + @out << @prefix.last unless @midline @out << line @midline = nil } @@ -400,23 +465,25 @@ class Generator end # Generate code with added prefix. - def prefix(add) - save=@prefix - @prefix+=add - yield - @prefix=save + def prefix(add, &block) + @prefix.push @prefix.last+add + if block then yield; endprefix; end + end + + def endprefix() + @prefix.pop end # Generate indented code def indent(n=1,&block) prefix(@indentstr * n,&block); end + alias :endindent :endprefix # Generate outdented code def outdent(&block) - save=@prefix - @prefix=@prefix[0...-2] - yield - @prefix=save + @prefix.push @prefix.last[0...-2] + if block then yield; endprefix; end end + alias :endoutdent :endprefix attr_accessor :out end diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 60a653e18d..edee72e0bd 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -54,12 +54,31 @@ CppKeywords = Set.new(["and", "and_eq", "asm", "auto", "bitand", CppMangle = CppKeywords+Set.new(["string"]) class String - def cppsafe() - CppMangle.include?(self) ? self+"_" : self + def cppsafe() CppMangle.include?(self) ? self+"_" : self; end + + def amqp2cpp() + path=split(".") + name=path.pop + return name.typename if path.empty? + path.map! { |n| n.nsname } + return (path << name.caps).join("::") end + + alias :typename :caps + alias :nsname :bars + alias :constname :shout + alias :funcname :lcaps + alias :varname :lcaps end # Hold information about a C++ type. +# +# preview - new mapping does not use CppType, +# Each amqp type corresponds exactly by dotted name +# to a type, domain or struct, which in turns +# corresponds by name to a C++ type or typedef. +# (see String.amqp2cpp) +# class CppType def initialize(name) @name=@param=@ret=name; end attr_reader :name, :param, :ret, :code @@ -93,11 +112,22 @@ class CppType def to_s() name; end; end +class AmqpElement + def cppfqname() + names=parent.dotted_name.split(".") + # Field children are moved up to method in C++b + prefix.pop if parent.is_a? AmqpField + prefix.push cppname + prefix.join("::") + end +end + class AmqpField def cppname() name.lcaps.cppsafe; end def cpptype() domain.cpptype; end def bit?() domain.type_ == "bit"; end def signature() cpptype.param+" "+cppname; end + def paramtype() "call_traits<#{type_.amqp2cpp}>::param_type"; end end class AmqpMethod @@ -107,8 +137,41 @@ class AmqpMethod def body_name() parent.name.caps+name.caps+"Body"; end end +module AmqpHasFields + def parameters() + fields.map { |f| "#{f.paramtype} #{f.cppname}_"}.join(",\n") + end + + def arguments() + fields.map { |f| "#{f.cppname}_"}.join(",\n") + end + + def values() + fields.map { |f| "#{f.cppname}"}.join(",\n") + end + + def initializers() + fields.map { |f| "#{f.cppname}(#{f.cppname}_)"}.join(",\n") + end +end + +class AmqpAction + def classname() name.typename; end + def funcname() parent.name.funcname + name.caps; end + include AmqpHasFields +end + +class AmqpCommand < AmqpAction + def base() "Command"; end +end + +class AmqpControl < AmqpAction + def base() "Control"; end +end + class AmqpClass - def cppname() name.caps; end + def cppname() name.caps; end # preview + def nsname() name.nsname; end end class AmqpDomain @@ -150,18 +213,26 @@ class AmqpResult end class AmqpStruct - def cpp_pack_type() AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); end - def cpptype() parent.cpptype; end - def cppname() cpptype.name; end + include AmqpHasFields + + def cpp_pack_type() # preview + AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); + end + def cpptype() parent.cpptype; end # preview + def cppname() cpptype.name; end # preview + + def classname() name.typename; end end class CppGen < Generator def initialize(outdir, *specs) super(outdir,*specs) + # need to sort classes for dependencies + @actions=[] # Stack of end-scope actions end # Write a header file. - def h_file(path) + def h_file(path, &block) path = (/\.h$/ === path ? path : path+".h") guard=path.upcase.tr('./-','_') file(path) { @@ -169,12 +240,12 @@ class CppGen < Generator gen "#define #{guard}\n" gen Copyright yield - gen "#endif /*!#{guard}*/\n" + gen "#endif /*!#{guard}*/\n" } end # Write a .cpp file. - def cpp_file(path) + def cpp_file(path, &block) path = (/\.cpp$/ === path ? path : path+".cpp") file(path) do gen Copyright @@ -188,10 +259,12 @@ class CppGen < Generator genl "#include #{header}" end - def scope(open="{",close="}", &block) - genl open; indent(&block); genl close + def scope(open="{",close="}", &block) + genl open + indent &block + genl close end - + def namespace(name, &block) genl names = name.split("::") @@ -210,7 +283,7 @@ class CppGen < Generator indent { gen "#{bases.join(",\n")}" } end genl - scope("{","};") { yield } + scope("{","};", &block) end def struct(name, *bases, &block) @@ -242,6 +315,11 @@ class CppGen < Generator prefix(" * ",&block) genl " */" end + + # Generate code in namespace for each class + def each_class_ns() + @amqp.classes.each { |c| namespace(c.nsname) { yield c } } + end end # Fully-qualified class name diff --git a/qpid/cpp/rubygen/generate b/qpid/cpp/rubygen/generate index 94b194aaa4..d094be4f41 100755 --- a/qpid/cpp/rubygen/generate +++ b/qpid/cpp/rubygen/generate @@ -1,5 +1,6 @@ #!/usr/bin/env ruby require 'amqpgen' +require 'pathname' # # Run a set of code generation templates. @@ -18,17 +19,39 @@ EOS exit 1 end -Outdir=ARGV[0] -Specs=ARGV.grep(/\.xml$/) -Amqp=AmqpRoot.new(*Specs) +# Create array of specs by version +def parse_specs(specs) + roots={ } + specs.each { |spec| + root=AmqpRoot.new(spec) + ver=root.version + if (roots[ver]) + roots[ver].merge(root) + else + roots[ver]=root + end + } + roots +end # Run selected templates if ARGV.any? { |arg| arg=="all" } - templates=Dir["#{File.dirname __FILE__}/templates/*.rb"] + templates=Dir["#{File.dirname __FILE__}/*/*.rb"] else templates=ARGV.grep(/\.rb$/) end -templates.each { |t| load t } + +$outdir=ARGV[0] +$models=parse_specs(ARGV.grep(/\.xml$/)) +templates.each { |t| + ver=Pathname.new(t).dirname.basename.to_s + $amqp=$models[ver] + if $amqp + load t + else + puts "Warning: skipping #{t}, no spec file for version #{ver}." + end +} def make_continue(lines) lines.join(" \\\n "); end @@ -40,7 +63,7 @@ if makefile generator_files=Dir["**/*.rb"] << File.basename(__FILE__) Dir.chdir dir rgen_generator=generator_files.map{ |f| "$(rgen_dir)/#{f}" } - rgen_srcs=GenFiles.get.map{ |f| "#{Outdir}/#{f}" } + rgen_srcs=GenFiles.get.map{ |f| "#{$outdir}/#{f}" } File.open(makefile, 'w') { |out| out << <<EOS @@ -51,13 +74,13 @@ rgen_generator=#{make_continue rgen_generator} rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))} -rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))} +rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r{qpid/(framing|amqp_.+)/.+\.cpp$}))} rgen_srcs=#{make_continue rgen_srcs} # Header file install rules. EOS - ["framing", "client/no_keyword","client", "broker"].each { |ns| + ["amqp_0_10", "framing", "client/no_keyword","client", "broker"].each { |ns| dir="qpid/#{ns}" dir_ = dir.tr("/", "_") regex=%r|#{dir}/[^/]+\.h$| @@ -69,7 +92,7 @@ EOS } out << <<EOS if GENERATE -$(rgen_srcs) $(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) +$(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) $(rgen_cmd) # Empty rule in case a generator file is renamed/removed. diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index b44aaa7e5e..e3b95e045f 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -11,11 +11,13 @@ EXTRA_DIST= $(platform_dist) # This phony target is needed by generated makefile fragments: force: -# AMQP_XML is defined in ../configure.ac -specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml - if GENERATE +# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac +amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml +amqp_0_10_xml=@AMQP_FINAL_XML@ +specs=$(amqp_99_0_xml) $(amqp_0_10_xml) + # Ruby generator. rgen_dir=$(top_srcdir)/rubygen rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk @@ -101,6 +103,7 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_common_cpp) \ $(platform_src) \ + qpid/amqp_0_10/helpers.cpp \ qpid/Serializer.h \ qpid/amqp_0_10/built_in_types.h \ qpid/amqp_0_10/Codec.h \ @@ -166,6 +169,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PreviewConnection.cpp \ qpid/broker/PreviewConnectionHandler.cpp \ qpid/broker/PreviewSessionHandler.cpp \ + qpid/broker/PreviewSessionManager.cpp \ + qpid/broker/PreviewSessionState.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -247,6 +252,7 @@ libqpidclient_la_SOURCES = \ nobase_include_HEADERS = \ $(platform_hdr) \ + qpid/amqp_0_10/helpers.h \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ @@ -270,6 +276,8 @@ nobase_include_HEADERS = \ qpid/broker/PreviewConnection.h \ qpid/broker/PreviewConnectionHandler.h \ qpid/broker/PreviewSessionHandler.h \ + qpid/broker/PreviewSessionManager.h \ + qpid/broker/PreviewSessionState.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index 07f157cfc3..17f0d5029c 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -32,20 +32,31 @@ std::string strError(int err) { return std::string(strerror_r(err, buf, sizeof(buf))); } -Exception::Exception(const std::string& s) throw() : msg(s) { - QPID_LOG(warning, "Exception: " << msg); +Exception::Exception(const std::string& msg, + const std::string& nm, + uint16_t cd) throw() + : message(msg), name(nm), code(cd), + whatStr((name.empty() ? "" : name + ": ")+ msg) +{ + QPID_LOG(warning, "Exception: " << whatStr); } Exception::~Exception() throw() {} -std::string Exception::str() const throw() { - if (msg.empty()) - const_cast<std::string&>(msg).assign(typeid(*this).name()); - return msg; +std::string Exception::getMessage() const throw() { return message; } + +std::string Exception::getName() const throw() { + return name.empty() ? typeid(*this).name() : name; } -const char* Exception::what() const throw() { return str().c_str(); } +uint16_t Exception::getCode() const throw() { return code; } + +const char* Exception::what() const throw() { + if (whatStr.empty()) return typeid(*this).name(); + else return whatStr.c_str(); +} -const std::string ClosedException::CLOSED_MESSAGE("Closed"); +ClosedException::ClosedException(const std::string& msg) + : Exception(msg, "ClosedException") {} } // namespace qpid diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h index 57c7a21234..00365018ba 100644 --- a/qpid/cpp/src/qpid/Exception.h +++ b/qpid/cpp/src/qpid/Exception.h @@ -40,13 +40,27 @@ std::string strError(int err); class Exception : public std::exception { public: - explicit Exception(const std::string& str=std::string()) throw(); + explicit Exception(const std::string& message=std::string(), + const std::string& name=std::string(), + uint16_t code=0) throw(); + virtual ~Exception() throw(); + + // returns "name: message" + virtual const char* what() const throw(); + + virtual std::string getName() const throw(); + virtual std::string getMessage() const throw(); + virtual uint16_t getCode() const throw(); + + // FIXME aconway 2008-02-21: backwards compat, remove? + std::string str() const throw() { return getMessage(); } - virtual const char *what() const throw(); - virtual std::string str() const throw(); private: - std::string msg; + const std::string message; + const std::string name; + const uint16_t code; + const std::string whatStr; }; struct ChannelException : public Exception { @@ -62,8 +76,7 @@ struct ConnectionException : public Exception { }; struct ClosedException : public Exception { - static const std::string CLOSED_MESSAGE; - ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {} + ClosedException(const std::string& msg=std::string()); }; } // namespace qpid diff --git a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h index 6cd9c72367..445f07459c 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h +++ b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h @@ -29,6 +29,7 @@ #include <boost/array.hpp> #include <stdint.h> #include <string> +#include <vector> /**@file Mapping from built-in AMQP types to C++ types */ @@ -66,7 +67,7 @@ typedef double Double; typedef float Float; typedef framing::SequenceNumber SequenceNo; using framing::Uuid; -typedef sys::AbsTime DateTime; +typedef sys::AbsTime Datetime; typedef Decimal<Uint8, Int32> Dec32; typedef Decimal<Uint8, Int64> Dec64; @@ -89,14 +90,18 @@ typedef CodableString<Uint16, Uint16> Str16Utf16; typedef CodableString<Uint8, Uint32> Vbin32; -/** FIXME aconway 2008-02-20: todo -byte-ranges 2 byte ranges within a 64-bit payload -sequence-set 2 ranged set representation -map 0xa8 4 a mapping of keys to typed values -list 0xa9 4 a series of consecutive type-value pairs -array 0xaa 4 a defined length collection of values of a single type -struct32 0xab 4 a coded struct with a 32-bit size -*/ +// FIXME aconway 2008-02-26: array encoding +template <class T> struct Array : public std::vector<T> {}; + +// FIXME aconway 2008-02-26: Unimplemented types: +struct ByteRanges {}; +struct SequenceSet {}; +struct Map {}; +struct List {}; +struct Struct32 {}; + +// Top level enum definitions. +enum SegmentType { CONTROL, COMMAND, HEADER, BODY }; }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp new file mode 100644 index 0000000000..4333a2cd92 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "helpers.h" + +namespace qpid { +namespace amqp_0_10 { + +Control::~Control() {} +Command::~Command() {} +Struct::~Struct() {} + +}} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.h b/qpid/cpp/src/qpid/amqp_0_10/helpers.h new file mode 100644 index 0000000000..1769d374d9 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.h @@ -0,0 +1,67 @@ +#ifndef QPID_AMQP_0_10_HELPERS_H +#define QPID_AMQP_0_10_HELPERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the +n * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { + +namespace amqp_0_10 { + +// Look up names by code +const char* getClassName(uint8_t code); +const char* getCommandName(uint8_t classCode, uint8_t code); +const char* getControlName(uint8_t classCode, uint8_t code); +const char* getStructName(uint8_t classCode, uint8_t code); + +struct Command { + virtual ~Command(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Control { + virtual ~Control(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Struct { + virtual ~Struct(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +/** Base class for generated enum domains. + * Enums map to classes for type safety and to provide separate namespaces + * for clashing values. + */ +struct Enum { + int value; + Enum(int v=0) : value(v) {} + operator int() const { return value; } + template <class S> void serialize(S &s) { s(value); } +}; + +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_HELPERS_H*/ diff --git a/qpid/cpp/src/qpid/amqp_0_10/visitors.h b/qpid/cpp/src/qpid/amqp_0_10/visitors.h new file mode 100644 index 0000000000..3835f37f3e --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/visitors.h @@ -0,0 +1,15 @@ +// Visitors +template <class Base> struct Visitor; +template <class Base, class F, class R> FunctorVisitor; + +/** Template base implementation for visitables. */ +template <class Base, class Derived> +struct VisitableBase : public Base { + virtual void accept(Visitor<Derived>& v) { + v.visit(static_cast<Derived>&(*this)); + } + virtual void accept(Visitor<Derived>& v) const { + v.visit(static_cast<const Derived>&(*this)); + } +}; + diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 0a0eb0a0df..9bfa868d9c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) : store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), - sessionManager(conf.ack) + sessionManager(conf.ack), + previewSessionManager(conf.ack) { // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 55bc7644a5..153eabc6b3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -30,6 +30,7 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" +#include "PreviewSessionManager.h" #include "Vhost.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } SessionManager& getSessionManager() { return sessionManager; } + PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; + PreviewSessionManager previewSessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index c8c1e12f28..ef2c51bb8d 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return session.getVersion();} + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} AccessHandler* getAccessHandler() { diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h index 410d400c9d..4c51e2a826 100644 --- a/qpid/cpp/src/qpid/broker/HandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h @@ -20,7 +20,7 @@ */ #include "SemanticState.h" -#include "SessionState.h" +#include "SessionContext.h" #include "ConnectionState.h" namespace qpid { @@ -35,13 +35,13 @@ class Broker; class HandlerImpl { protected: SemanticState& state; - SessionState& session; + SessionContext& session; HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } ConnectionState& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp index 19e6a235c4..36092bb7f6 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "PreviewSessionHandler.h" -#include "SessionState.h" +#include "PreviewSessionState.h" #include "PreviewConnection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -36,7 +36,7 @@ using namespace std; using namespace qpid::sys; PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const { void PreviewSessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); peerSession.attached(session->getId(), session->getTimeout()); } void PreviewSessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); + session = connection.broker.getPreviewSessionManager().resume(id); session->attach(*this); SequenceNumber seq = session->resuming(); peerSession.attached(session->getId(), session->getTimeout()); @@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) void PreviewSessionHandler::localSuspend() { if (session.get() && session->isAttached()) { session->detach(); - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } } @@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { + if (session->getState() == PreviewSessionState::RESUMING) { session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), @@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() { void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); } void PreviewSessionHandler::detached() { - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h index e1096ebf9f..4c517367d7 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h @@ -36,7 +36,7 @@ namespace qpid { namespace broker { class PreviewConnection; -class SessionState; +class PreviewSessionState; /** * A SessionHandler is associated with each active channel. It @@ -45,7 +45,7 @@ class SessionState; */ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand ~PreviewSessionHandler(); /** Returns 0 if not attached to a session */ - SessionState* getSession() { return session.get(); } - const SessionState* getSession() const { return session.get(); } + PreviewSessionState* getSession() { return session.get(); } + const PreviewSessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel.get(); } @@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; - std::auto_ptr<SessionState> session; + std::auto_ptr<PreviewSessionState> session; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp new file mode 100644 index 0000000000..ec73082817 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewSessionManager.h" +#include "PreviewSessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" +#include "qpid/memory.h" + +#include <boost/bind.hpp> +#include <boost/range.hpp> + +#include <algorithm> +#include <functional> +#include <ostream> + +namespace qpid { +namespace broker { + +using namespace sys; +using namespace framing; + +PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {} + +PreviewSessionManager::~PreviewSessionManager() {} + +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. +std::auto_ptr<PreviewSessionState> PreviewSessionManager::open( + PreviewSessionHandler& h, uint32_t timeout_) +{ + Mutex::ScopedLock l(lock); + std::auto_ptr<PreviewSessionState> session( + new PreviewSessionState(this, &h, timeout_, ack)); + active.insert(session->getId()); + for_each(observers.begin(), observers.end(), + boost::bind(&Observer::opened, _1,boost::ref(*session))); + return session; +} + +void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) { + Mutex::ScopedLock l(lock); + active.erase(session->getId()); + session->suspend(); + session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); + suspended.push_back(session.release()); // In expiry order + eraseExpired(); +} + +std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id) +{ + Mutex::ScopedLock l(lock); + eraseExpired(); + if (active.find(id) != active.end()) + throw SessionBusyException( + QPID_MSG("Session already active: " << id)); + Suspended::iterator i = std::find_if( + suspended.begin(), suspended.end(), + boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1)) + ); + if (i == suspended.end()) + throw InvalidArgumentException( + QPID_MSG("No suspended session with id=" << id)); + active.insert(id); + std::auto_ptr<PreviewSessionState> state(suspended.release(i).release()); + return state; +} + +void PreviewSessionManager::erase(const framing::Uuid& id) +{ + Mutex::ScopedLock l(lock); + active.erase(id); +} + +void PreviewSessionManager::eraseExpired() { + // Called with lock held. + if (!suspended.empty()) { + Suspended::iterator keep = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2)); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } + } +} + +void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) { + observers.push_back(o); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.h b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h new file mode 100644 index 0000000000..65ca49ec89 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H +#define QPID_BROKER_PREVIEWSESSIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/Uuid.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Mutex.h> +#include <qpid/RefCounted.h> + +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <set> +#include <vector> +#include <memory> + +namespace qpid { +namespace broker { + +class PreviewSessionState; +class PreviewSessionHandler; + +/** + * Create and manage PreviewSessionState objects. + */ +class PreviewSessionManager : private boost::noncopyable { + public: + /** + * Observer notified of PreviewSessionManager events. + */ + struct Observer : public RefCounted { + virtual void opened(PreviewSessionState&) {} + }; + + PreviewSessionManager(uint32_t ack); + + ~PreviewSessionManager(); + + /** Open a new active session, caller takes ownership */ + std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_); + + /** Suspend a session, start it's timeout counter. + * The factory takes ownership. + */ + void suspend(std::auto_ptr<PreviewSessionState> session); + + /** Resume a suspended session. + *@throw Exception if timed out or non-existant. + */ + std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); + + /** Add an Observer. */ + void add(const intrusive_ptr<Observer>&); + + private: + typedef boost::ptr_vector<PreviewSessionState> Suspended; + typedef std::set<framing::Uuid> Active; + typedef std::vector<intrusive_ptr<Observer> > Observers; + + void erase(const framing::Uuid&); + void eraseExpired(); + + sys::Mutex lock; + Suspended suspended; + Active active; + uint32_t ack; + Observers observers; + + friend class PreviewSessionState; // removes deleted sessions from active set. +}; + + + +}} // namespace qpid::broker + + + + + +#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/ diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp new file mode 100644 index 0000000000..7188ffbf40 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewSessionState.h" +#include "PreviewSessionManager.h" +#include "PreviewSessionHandler.h" +#include "ConnectionState.h" +#include "Broker.h" +#include "SemanticHandler.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +using namespace framing; +using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +PreviewSessionState::PreviewSessionState( + PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack, timeout_ > 0), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + in.next = semanticHandler.get(); + out.next = &handler->out; + + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } +} + +PreviewSessionState::~PreviewSessionState() { + // Remove ID from active session list. + if (factory) + factory->erase(getId()); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +PreviewSessionHandler* PreviewSessionState::getHandler() { + return handler; +} + +AMQP_ClientProxy& PreviewSessionState::getProxy() { + assert(isAttached()); + return getHandler()->getProxy(); +} + +ConnectionState& PreviewSessionState::getConnection() { + assert(isAttached()); + return getHandler()->getConnection(); +} + +void PreviewSessionState::detach() { + getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + Mutex::ScopedLock l(lock); + handler = 0; out.next = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } +} + +void PreviewSessionState::attach(PreviewSessionHandler& h) { + { + Mutex::ScopedLock l(lock); + handler = &h; + out.next = &handler->out; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } + } + h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} + +void PreviewSessionState::activateOutput() +{ + Mutex::ScopedLock l(lock); + if (isAttached()) { + getConnection().outputTasks.activateOutput(); + } +} + //This class could be used as the callback for queue notifications + //if not attached, it can simply ignore the callback, else pass it + //on to the connection + +ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->detach(); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + /* + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + */ + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.h b/qpid/cpp/src/qpid/broker/PreviewSessionState.h new file mode 100644 index 0000000000..6e8523317c --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.h @@ -0,0 +1,124 @@ +#ifndef QPID_BROKER_PREVIEWSESSION_H +#define QPID_BROKER_PREVIEWSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" +#include "SessionContext.h" + +#include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> + +#include <set> +#include <vector> +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class SemanticHandler; +class PreviewSessionHandler; +class PreviewSessionManager; +class Broker; +class ConnectionState; + +/** + * Broker-side session state includes sessions handler chains, which may + * themselves have state. + */ +class PreviewSessionState : public framing::SessionState, + public SessionContext, + public framing::FrameHandler::Chains, + public management::Manageable +{ + public: + ~PreviewSessionState(); + bool isAttached() { return handler; } + + void detach(); + void attach(PreviewSessionHandler& handler); + + + PreviewSessionHandler* getHandler(); + + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + ConnectionState& getConnection(); + + uint32_t getTimeout() const { return timeout; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + + /** OutputControl **/ + void activateOutput(); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + + // Normally SessionManager creates sessions. + PreviewSessionState(PreviewSessionManager*, + PreviewSessionHandler* out, + uint32_t timeout, + uint32_t ackInterval); + + + private: + PreviewSessionManager* factory; + PreviewSessionHandler* handler; + framing::Uuid id; + uint32_t timeout; + sys::AbsTime expiry; // Used by SessionManager. + Broker& broker; + framing::ProtocolVersion version; + sys::Mutex lock; + boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; + + friend class PreviewSessionManager; +}; + + +inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) { + return out << session.getId(); +} + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index 2a79496144..fdde7ec18c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,7 +22,6 @@ #include "SemanticHandler.h" #include "SemanticState.h" #include "SessionContext.h" -#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -37,9 +36,9 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : +SemanticHandler::SemanticHandler(SessionContext& s) : state(*this,s), session(s), - msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()), + msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()), ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) {} @@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - SessionContext* handler = session.getHandler(); - if (handler) { - uint32_t maxFrameSize = handler->getConnection().getFrameMax(); - MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize); - } else { - QPID_LOG(error, "Dropping message as session is no longer attached to a channel."); - } + uint32_t maxFrameSize = session.getConnection().getFrameMax(); + MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); return outgoing.hwm; } diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index d7f3ec8799..893a0cbded 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -46,7 +46,7 @@ class AMQHeaderBody; namespace broker { -class SessionState; +class SessionContext; class SemanticHandler : public DeliveryAdapter, public framing::FrameHandler, @@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter, typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; SemanticState state; - SessionState& session; + SessionContext& session; // TODO aconway 2007-09-20: Why are these on the handler rather than the // state? IncomingExecutionContext incoming; @@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter, framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } //Connection& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } public: - SemanticHandler(SessionState& session); + SemanticHandler(SessionContext& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 7b4035604f..9b44f31e14 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -19,7 +19,7 @@ * */ -#include "SessionState.h" +#include "SessionContext.h" #include "BrokerAdapter.h" #include "Queue.h" #include "Connection.h" @@ -56,7 +56,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; -SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) +SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), deliveryAdapter(da), prefetchSize(0), @@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (parent->getSession().isAttached() && accept(msg.payload)) { - allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } - return true; - } else { - QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); - return false; + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } + return true; } bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) @@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(getSession().getBroker(), queue); + Queue::tryAutoDelete(session.getBroker(), queue); } } } @@ -352,7 +347,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); + cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 7fc6e4167c..cc9c0e1e9b 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -45,7 +45,7 @@ namespace qpid { namespace broker { -class SessionState; +class SessionContext; /** * SemanticState holds the L3 and L4 state of an open session, whether @@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains, typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; - SessionState& session; + SessionContext& session; DeliveryAdapter& deliveryAdapter; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains, void cancel(ConsumerImpl&); public: - SemanticState(DeliveryAdapter&, SessionState&); + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); - SessionState& getSession() { return session; } + SessionContext& getSession() { return session; } /** * Get named queue, never returns 0. diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index a27b43cf65..a289310b15 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/OutputControl.h" #include "ConnectionState.h" @@ -33,17 +34,13 @@ namespace qpid { namespace broker { -class SessionContext : public framing::FrameHandler::InOutHandler +class SessionContext : public sys::OutputControl { public: - SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {} virtual ~SessionContext(){} virtual ConnectionState& getConnection() = 0; - virtual const ConnectionState& getConnection() const = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; - virtual const framing::AMQP_ClientProxy& getProxy() const = 0; - virtual void detach() = 0; - virtual framing::ChannelId getChannel() const = 0; + virtual Broker& getBroker() = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 5a72bfb12d..e6bc463a82 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -28,7 +28,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" -#include "SessionContext.h" +#include "qpid/framing/SequenceNumber.h" #include <boost/noncopyable.hpp> @@ -36,16 +36,18 @@ namespace qpid { namespace broker { class Connection; +class ConnectionState; class SessionState; /** * A SessionHandler is associated with each active channel. It - * receives incoming frames, handles session commands and manages the + * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::AMQP_ServerOperations::ExecutionHandler, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); void detached(); + //Execution methods: + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; - Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index aa7ac9a8bb..571d3365db 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -45,7 +45,7 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionContext& h, uint32_t timeout_) + SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h index 94956a83ed..7e8bd18f57 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.h +++ b/qpid/cpp/src/qpid/broker/SessionManager.h @@ -38,7 +38,7 @@ namespace qpid { namespace broker { class SessionState; -class SessionContext; +class SessionHandler; /** * Create and manage SessionState objects. @@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index b6c59cfb3b..573a567da6 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -19,12 +19,16 @@ * */ #include "SessionState.h" -#include "SessionManager.h" -#include "SessionContext.h" -#include "ConnectionState.h" #include "Broker.h" +#include "ConnectionState.h" +#include "MessageDelivery.h" #include "SemanticHandler.h" +#include "SessionManager.h" +#include "SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" + +#include <boost/bind.hpp> namespace qpid { namespace broker { @@ -37,17 +41,17 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) + semanticState(*this, *this), + adapter(semanticState), + msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) { - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.addOutputTask(&semanticState); Manageable* parent = broker.GetVhostObject (); @@ -76,7 +80,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionContext* SessionState::getHandler() { +SessionHandler* SessionState::getHandler() { return handler; } @@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() { } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; + handler = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); } } -void SessionState::attach(SessionContext& h) { +void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; - out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); @@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + h.getConnection().outputTasks.addOutputTask(&semanticState); } void SessionState::activateOutput() @@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } +void SessionState::handleCommand(framing::AMQMethodBody* method) +{ + SequenceNumber id = incoming.next(); + Invoker::Result invocation = invoke(adapter, *method); + incoming.complete(id); + + if (!invocation.wasHandled()) { + throw NotImplementedException("Not implemented"); + } else if (invocation.hasResult()) { + getProxy().getExecution().result(id.getValue(), invocation.getResult()); + } + if (method->isSync()) { + incoming.sync(id); + sendCompletion(); + } + //TODO: if window gets too large send unsolicited completion +} + +void SessionState::handleContent(AMQFrame& frame) +{ + intrusive_ptr<Message> msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(incoming.next()); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&getConnection()); + semanticState.handle(msg); + msgBuilder.end(); + incoming.track(msg); + if (msg->getFrames().getMethod()->isSync()) { + incoming.sync(msg->getCommandId()); + sendCompletion(); + } + } +} + +void SessionState::handle(AMQFrame& frame) +{ + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assmblies are non-content bearing and all content-bearing + //assmeblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + handleContent(frame); + } + +} + +DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); + return outgoing.hwm; +} + +void SessionState::sendCompletion() +{ + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); + getProxy().getExecution().complete(mark.getValue(), range); +} + +void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + semanticState.ackCumulative(mark.getValue()); + } + range.processRanges(ackOp); +} + +void SessionState::flush() +{ + incoming.flush(); + sendCompletion(); +} + +void SessionState::sync() +{ + incoming.sync(); + sendCompletion(); +} + +void SessionState::noop() +{ + incoming.noop(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 8a12e580b7..98c21a8ab5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -27,10 +27,15 @@ #include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" +#include "BrokerAdapter.h" +#include "DeliveryAdapter.h" +#include "MessageBuilder.h" +#include "SessionContext.h" +#include "SemanticState.h" +#include "IncomingExecutionContext.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -47,8 +52,7 @@ class AMQP_ClientProxy; namespace broker { -class SemanticHandler; -class SessionContext; +class SessionHandler; class SessionManager; class Broker; class ConnectionState; @@ -58,8 +62,8 @@ class ConnectionState; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::Chains, - public sys::OutputControl, + public SessionContext, + public DeliveryAdapter, public management::Manageable { public: @@ -67,10 +71,10 @@ class SessionState : public framing::SessionState, bool isAttached() { return handler; } void detach(); - void attach(SessionContext& handler); + void attach(SessionHandler& handler); - SessionContext* getHandler(); + SessionHandler* getHandler(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,6 +89,19 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + void handle(framing::AMQFrame& frame); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); + + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void sync(); + void sendCompletion(); + + //delivery adapter methods: + DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t @@ -92,21 +109,32 @@ class SessionState : public framing::SessionState, // Normally SessionManager creates sessions. SessionState(SessionManager*, - SessionContext* out, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); private: + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + SessionManager* factory; - SessionContext* handler; + SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; framing::ProtocolVersion version; sys::Mutex lock; - boost::scoped_ptr<SemanticHandler> semanticHandler; + + SemanticState semanticState; + BrokerAdapter adapter; + MessageBuilder msgBuilder; + + //execution state + IncomingExecutionContext incoming; + framing::Window outgoing; + RangedOperation ackOp; + management::Session::shared_ptr mgmtObject; friend class SessionManager; diff --git a/qpid/cpp/src/qpid/client/Session.h b/qpid/cpp/src/qpid/client/Session.h index 3293af60fe..5d91f289e2 100644 --- a/qpid/cpp/src/qpid/client/Session.h +++ b/qpid/cpp/src/qpid/client/Session.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { /** - * Session is currently just an alias for Session_0_10 + * Session is currently just an alias for Session_99_0 * * \ingroup clientapi */ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..5152aa2e43 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,7 @@ */ #include "Cluster.h" -#include "qpid/broker/SessionState.h" +#include "qpid/broker/PreviewSessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,18 +32,18 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::PreviewSessionState; namespace { // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + PreviewSessionState& session; Cluster& cluster; bool busy; Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} void handle(AMQFrame& f) { Mutex::ScopedLock l(lock); @@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) { c.next = h; } -struct SessionObserver : public broker::SessionManager::Observer { +struct SessionObserver : public broker::PreviewSessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void opened(PreviewSessionState& s) { // FIXME aconway 2008-01-29: IList for memory management. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b62b2be5f1..733db8003d 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } + intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - intrusive_ptr<broker::SessionManager::Observer> observer; + intrusive_ptr<broker::PreviewSessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..0ea3953175 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); + broker->getPreviewSessionManager().add(cluster->getObserver()); } } }; diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h index 1be8856c13..b15e14d6f6 100644 --- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -33,6 +33,7 @@ namespace qpid { namespace framing { static ProtocolVersion highestProtocolVersion(99, 0); +//static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ } /* namespace qpid */ diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h index b6ac897e96..86b99a83b0 100644 --- a/qpid/cpp/src/qpid/framing/Proxy.h +++ b/qpid/cpp/src/qpid/framing/Proxy.h @@ -39,6 +39,7 @@ class Proxy void send(const AMQBody&); ProtocolVersion getVersion() const; + FrameHandler& getHandler() { return out; } protected: FrameHandler& out; diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index 700aeef47c..715cdaec2a 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { BOOST_CHECK_THROW(session.close(), InternalErrorException); } -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } diff --git a/qpid/cpp/src/tests/serialize.cpp b/qpid/cpp/src/tests/serialize.cpp index 72a92ee226..a120be6458 100644 --- a/qpid/cpp/src/tests/serialize.cpp +++ b/qpid/cpp/src/tests/serialize.cpp @@ -79,7 +79,7 @@ template <class A, class B, class C, class D> struct concat4 { typedef typename typedef mpl::vector<Bit, Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes; typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes; typedef mpl::vector<Double, Float>::type FloatTypes; -typedef mpl::vector<SequenceNo, Uuid, DateTime, Dec32, Dec64> FixedSizeClassTypes; +typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes; typedef mpl::vector<Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes; @@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(testNetworkByteOrder) { void testValue(bool& b) { b = true; } template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; } void testValue(long long& l) { l = 12345; } -void testValue(DateTime& dt) { dt = qpid::sys::now(); } +void testValue(Datetime& dt) { dt = qpid::sys::now(); } void testValue(Uuid& uuid) { uuid=Uuid(true); } template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=1234; } void testValue(SequenceNo& s) { s = 42; } diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index bbdb501b80..d1e3293a3e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -20,7 +20,7 @@ - --> -<amqp major="0" minor="10" port="5672"> +<amqp major="99" minor="0" port="5672"> <class name = "cluster" index = "201"> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 0e36a09bbd..e02b3d6643 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + import javax.jms.Destination; import javax.naming.NamingException; import javax.naming.Reference; @@ -31,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; public abstract class AMQDestination implements Destination, Referenceable @@ -50,6 +51,8 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _routingKey; + private AMQShortString[] _bindingKeys; + private String _url; private AMQShortString _urlAsShortString; @@ -64,7 +67,7 @@ public abstract class AMQDestination implements Destination, Referenceable public static final Integer TOPIC_TYPE = Integer.valueOf(2); public static final Integer UNKNOWN_TYPE = Integer.valueOf(3); - protected AMQDestination(String url) throws URLSyntaxException + protected AMQDestination(String url) throws URISyntaxException { this(new AMQBindingURL(url)); } @@ -79,26 +82,43 @@ public abstract class AMQDestination implements Destination, Referenceable _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey()); + _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) { - this(exchangeName, exchangeClass, routingKey, false, false, queueName); + this(exchangeName, exchangeClass, routingKey, false, false, queueName, null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys) + { + this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) { - this(exchangeName, exchangeClass, destinationName, false, false, null); + this(exchangeName, exchangeClass, destinationName, false, false, null,null); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName) + boolean isAutoDelete, AMQShortString queueName) { - this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false); + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys) + { + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable){ + this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) { // If used with a fannout exchange, the routing key can be null if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null) @@ -120,6 +140,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isAutoDelete = isAutoDelete; _queueName = queueName; _isDurable = isDurable; + _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; } public AMQShortString getEncodedName() @@ -181,6 +202,20 @@ public abstract class AMQDestination implements Destination, Referenceable return _routingKey; } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length > 0) + { + return _bindingKeys; + } + else + { + // catering to the common use case where the + //routingKey is the same as the bindingKey. + return new AMQShortString[]{_routingKey}; + } + } + public boolean isExclusive() { return _isExclusive; @@ -239,6 +274,21 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + // We can't allow both routingKey and bindingKey + if (_routingKey == null && _bindingKeys != null && _bindingKeys.length>0) + { + + for (AMQShortString bindingKey:_bindingKeys) + { + sb.append(BindingURL.OPTION_BINDING_KEY); + sb.append("='"); + sb.append(bindingKey); + sb.append("'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + + } + } + if (_isDurable) { sb.append(BindingURL.OPTION_DURABLE); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 924b20e3ba..78b01add14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -61,8 +61,14 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, - false, queueName, false); } + false, queueName, false); + } + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + false, queueName, false,bindingKeys); + } /** * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. @@ -126,11 +132,15 @@ public class AMQQueue extends AMQDestination implements Queue this(exchangeName, routingKey, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { + this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null); + } + + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys) + { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, - autoDelete, queueName, durable); + autoDelete, queueName, durable, bindingKeys); } public AMQShortString getRoutingKey() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ac5055ccd0..95d9b45b10 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; @@ -86,7 +87,6 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -564,19 +564,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException + final AMQShortString exchangeName,final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments); - - AMQFrame queueBind = body.generateFrame(_channelId); - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); - + sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); return null; } }, _connection).execute(); @@ -587,12 +582,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { if( consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName()); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException; + final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; /** @@ -1036,7 +1031,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQQueue(new AMQBindingURL(queueName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1253,7 +1248,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQTopic(new AMQBindingURL(topicName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1380,6 +1375,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + public void declareAndBind(AMQDestination amqd) + throws + AMQException + { + AMQProtocolHandler protocolHandler = getProtocolHandler(); + declareExchange(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, protocolHandler); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); + } + /** * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message. * @@ -1820,35 +1825,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException - { - try - { - AMQMethodEvent response = - new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() - { - public AMQMethodEvent execute() throws AMQException, FailoverException - { - ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName); - AMQFrame boundFrame = body.generateFrame(_channelId); - - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); - - } - }, _connection).execute(); - - // Extract and return the response code from the query. - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - - return (responseBody.getReplyCode() == 0); - } - catch (AMQException e) - { - throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); - } - } + public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException; + + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover @@ -2509,16 +2489,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public void declareAndBind(AMQDestination amqd) - throws - AMQException - { - AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName()); - } - /** * Callers must hold the failover mutex before calling this method. * @@ -2540,7 +2510,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.setQueuename(queueName); // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 407f1f3786..8039e3a163 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import javax.jms.IllegalStateException; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; @@ -159,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - + _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -213,7 +214,7 @@ public class AMQSession_0_10 extends AMQSession * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName) + final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -222,7 +223,12 @@ public class AMQSession_0_10 extends AMQSession { args.put("x-match", "any"); } - getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args); + + for (AMQShortString rk: destination.getBindingKeys()) + { + _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); + getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + } // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -238,6 +244,7 @@ public class AMQSession_0_10 extends AMQSession */ public void sendClose(long timeout) throws AMQException, FailoverException { + getQpidSession().sync(); getQpidSession().sessionClose(); getCurrentException(); } @@ -350,19 +357,37 @@ public class AMQSession_0_10 extends AMQSession /** * Bind a queue with an exchange. */ - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, - final AMQShortString routingKey) throws JMSException + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException + { + return isQueueBound(exchangeName,queueName,routingKey,null); + } + + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); + } + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) + throws JMSException { String rk = ""; boolean res; - if (routingKey != null) + if (bindingKeys != null && bindingKeys.length>0) + { + rk = bindingKeys[0].toString(); + } + else if (routingKey != null) { rk = routingKey.toString(); } + Future<BindingQueryResult> result = - getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null); + getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null); BindingQueryResult bindingQueryResult = result.get(); - if (routingKey == null) + + if (rk == null) { res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } @@ -577,7 +602,7 @@ public class AMQSession_0_10 extends AMQSession { // this is done so that we can produce to a temporary queue beofre we create a consumer sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName()); + sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } catch (Exception e) @@ -701,7 +726,7 @@ public class AMQSession_0_10 extends AMQSession AMQShortString topicName; if (topic instanceof AMQTopic) { - topicName=((AMQTopic) topic).getRoutingKey(); + topicName=((AMQTopic) topic).getBindingKeys()[0]; } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 59129bd28c..0450cffcaf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -94,7 +94,7 @@ public class AMQSession_0_8 extends AMQSession } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). @@ -171,6 +171,11 @@ public class AMQSession_0_8 extends AMQSession } } + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); + } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index e571b5437e..40041afdc6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -49,6 +49,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } + public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys); + } + public AMQTopic(AMQConnection conn, String routingKey) { this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); @@ -77,6 +82,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) + { + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); + } public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 74e466acc4..25494428ea 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -51,7 +51,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me /** The connection being used by this consumer */ protected final AMQConnection _connection; - private final String _messageSelector; + protected final String _messageSelector; private final boolean _noLocal; @@ -740,6 +740,8 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } else { + // we should not be allowed to add a message is the + // consumer is closed _synchronousQueue.put(jmsMessage); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ef0f219092..74aac53cda 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -38,7 +38,6 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicLong; import java.util.Iterator; /** @@ -122,8 +121,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (AMQException e) { + _logger.error("Receivecd an Exception when receiving message",e); try { + getSession().getAMQConnection().getExceptionListener() .onException(new JMSAMQException("Error when receiving message", e)); } @@ -135,6 +136,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } if (messageOk) { + _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); } } @@ -290,7 +292,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // TODO Use a tag for fiding out if message filtering is done here or by the broker. try { - if (getMessageSelector() != null && !getMessageSelector().equals("")) + if (_messageSelector != null && !_messageSelector.equals("")) { messageOk = _filter.matches(message); } @@ -332,6 +334,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); + _logger.debug("filterMessage - *************************************"); + _logger.debug("filterMessage - message acquire status : " + messageOk); + _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -393,13 +398,29 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession() .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); + + _logger.debug("acquireMessage, sent acquire message to broker"); + _0_10session.getQpidSession().sync(); + + _logger.debug("acquireMessage, returned from sync"); + RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages(); + + _logger.debug("acquireMessage, acquired range set " + acquired); + if (acquired != null && acquired.size() > 0) { result = true; } + + _logger.debug("acquireMessage, Trying to get current exception "); + _0_10session.getCurrentException(); + + _logger.debug("acquireMessage, returned from getting current exception "); + + _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " ); } return result; } @@ -473,4 +494,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _session.rejectMessage(message, true); } } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 42c1d687cb..c6baf0b0fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,22 +17,22 @@ */ package org.apache.qpid.client; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import java.io.IOException; +import java.net.URISyntaxException; + +import javax.jms.JMSException; +import javax.jms.Message; + import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.nclient.util.ByteBufferMessage; -import org.apache.qpidity.transport.ReplyTo; +import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.transport.DeliveryProperties; - -import javax.jms.Message; -import javax.jms.JMSException; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpidity.transport.ReplyTo; /** * This is a 0_10 message producer. @@ -154,12 +154,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer String replyToURL = contentHeaderProperties.getReplyToAsString(); if (replyToURL != null) { + if(_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("\n=========================="); + b.append("\nReplyTo : " + replyToURL); + b.append("\n=========================="); + _logger.debug(b.toString()); + } AMQBindingURL dest; try { dest = new AMQBindingURL(replyToURL); } - catch (URLSyntaxException e) + catch (URISyntaxException e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } @@ -198,8 +206,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), - destination.getRoutingKey()); + return _session.isQueueBound(destination); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index abca931acc..5baf058668 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,30 +20,34 @@ */ package org.apache.qpid.client.message; -import org.apache.commons.collections.map.ReferenceMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.UUID; -import org.apache.mina.common.ByteBuffer; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import java.util.UUID; -import java.io.IOException; -import java.net.URISyntaxException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 4eeb702703..bf2a2fdd73 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -140,8 +140,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory props.setType(mprop.getType()); props.setUserId(mprop.getUserId()); props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders())); - AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); - message.receivedFromServer(); + AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); return message; } @@ -152,7 +151,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory { final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies); msg.setJMSRedelivered(redelivered); - + msg.receivedFromServer(); return msg; } @@ -164,7 +163,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory final AbstractJMSMessage msg = create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL); msg.setJMSRedelivered(redelivered); - + msg.receivedFromServer(); return msg; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index b3f46ab0f6..43ac56dee9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,6 +20,24 @@ */ package org.apache.qpid.jndi; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; + import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQHeadersExchange; @@ -30,28 +48,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - public class PropertiesFileInitialContextFactory implements InitialContextFactory { protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); @@ -184,6 +183,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor Topic t = createTopic(entry.getValue().toString()); if (t != null) { + if (_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("Creating the topic: " + jndiName + " with the following binding keys "); + for (AMQShortString binding:((AMQTopic)t).getBindingKeys()) + { + b.append(binding.toString()).append(","); + } + + _logger.debug(b.toString()); + } data.put(jndiName, t); } } @@ -219,7 +229,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } catch (URISyntaxException urlse) { - _logger.warn("Unable to destination:" + urlse); + _logger.warn("Unable to create destination:" + urlse, urlse); return null; } @@ -268,7 +278,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else if (value instanceof String) { - return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value)); + String[] keys = ((String)value).split(","); + AMQShortString[] bindings = new AMQShortString[keys.length]; + int i = 0; + for (String key:keys) + { + bindings[i] = new AMQShortString(key); + i++; + } + // The Destination has a dual nature. If this was used for a producer the key is used + // for the routing key. If it was used for the consumer it becomes the bindingKey + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings); } else if (value instanceof BindingURL) { diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java new file mode 100644 index 0000000000..83d491baad --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/Constants.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpidity.nclient.impl; + +/** + * This class holds all the 0.10 client constants which value can be set + * through properties. + */ +public class Constants +{ + static + { + + String max="message_size_before_sync";// KB's + try + { + MAX_NOT_SYNC_DATA_LENGH=new Long(System.getProperties().getProperty(max, "200000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_SYNC_DATA_LENGH=200000000; + } + String flush="message_size_before_flush"; + try + { + MAX_NOT_FLUSH_DATA_LENGH=new Long(System.getProperties().getProperty(flush, "2000000")); + } + catch (NumberFormatException e) + { + // use default size + MAX_NOT_FLUSH_DATA_LENGH=20000000; + } + } + + /** + * The total message size in KBs that can be transferted before + * client and broker are synchronized. + * A sync will result in the client library releasing the sent messages + * from memory. (messages are kept + * in memory so client can reconnect to a broker in the event of a failure) + * <p> + * Property name: message_size_before_sync + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_SYNC_DATA_LENGH; + /** + * The total message size in KBs that can be transferted before + * messages are flushed. + * When a flush returns all messages have reached the broker. + * <p> + * Property name: message_size_before_flush + * <p> + * Default value: 200000000 + */ + public static long MAX_NOT_FLUSH_DATA_LENGH; + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java index 66be1ebc73..88dd212ab6 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,12 +23,18 @@ package org.apache.qpid.test.unit.client.destinationurl; import junit.framework.TestCase; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.unit.basic.PropertyValueTest; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; public class DestinationURLTest extends TestCase { - public void testFullURL() throws URLSyntaxException + private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class); + + public void testFullURL() throws URISyntaxException { String url = "exchange.Class://exchangeName/Destination/Queue"; @@ -43,7 +49,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("Queue")); } - public void testQueue() throws URLSyntaxException + public void testQueue() throws URISyntaxException { String url = "exchangeClass://exchangeName//Queue"; @@ -58,7 +64,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("Queue")); } - public void testQueueWithOption() throws URLSyntaxException + public void testQueueWithOption() throws URISyntaxException { String url = "exchangeClass://exchangeName//Queue?option='value'"; @@ -75,7 +81,7 @@ public class DestinationURLTest extends TestCase } - public void testDestination() throws URLSyntaxException + public void testDestination() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/"; @@ -90,7 +96,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getQueueName().equals("")); } - public void testDestinationWithOption() throws URLSyntaxException + public void testDestinationWithOption() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/?option='value'"; @@ -107,7 +113,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("option").equals("value")); } - public void testDestinationWithMultiOption() throws URLSyntaxException + public void testDestinationWithMultiOption() throws URISyntaxException { String url = "exchangeClass://exchangeName/Destination/?option='value',option2='value2'"; @@ -123,7 +129,7 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("option2").equals("value2")); } - public void testDestinationWithNoExchangeDefaultsToDirect() throws URLSyntaxException + public void testDestinationWithNoExchangeDefaultsToDirect() throws URISyntaxException { String url = "IBMPerfQueue1?durable='true'"; @@ -138,6 +144,41 @@ public class DestinationURLTest extends TestCase assertTrue(dest.getOption("durable").equals("true")); } + public void testDestinationWithMultiBindingKeys() throws URISyntaxException + { + + String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'"; + + AMQBindingURL dest = new AMQBindingURL(url); + + assertTrue(dest.getExchangeClass().equals("exchangeClass")); + assertTrue(dest.getExchangeName().equals("exchangeName")); + assertTrue(dest.getDestinationName().equals("Destination")); + assertTrue(dest.getQueueName().equals("")); + + assertTrue(dest.getBindingKeys().length == 2); + } + + // You can only specify only a routing key or binding key, but not both. + public void testDestinationIfOnlyRoutingKeyOrBindingKeyIsSpecified() throws URISyntaxException + { + + String url = "exchangeClass://exchangeName/Destination/?bindingkey='key1',routingkey='key2'"; + boolean exceptionThrown = false; + try + { + + AMQBindingURL dest = new AMQBindingURL(url); + } + catch(URISyntaxException e) + { + exceptionThrown = true; + _logger.info("Exception thrown",e); + } + + assertTrue("Failed to throw an URISyntaxException when both the bindingkey and routingkey is specified",exceptionThrown); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(DestinationURLTest.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 529a05b2e2..998242925c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.url; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; - public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); String _url; - AMQShortString _exchangeClass; - AMQShortString _exchangeName; - AMQShortString _destinationName; - AMQShortString _queueName; + AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + AMQShortString _exchangeName = new AMQShortString(""); + AMQShortString _destinationName = new AMQShortString("");; + AMQShortString _queueName = new AMQShortString(""); + AMQShortString[] _bindingKeys = new AMQShortString[0]; private HashMap<String, String> _options; - public AMQBindingURL(String url) throws URLSyntaxException + public AMQBindingURL(String url) throws URISyntaxException { // format: // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* @@ -52,116 +53,35 @@ public class AMQBindingURL implements BindingURL parseBindingURL(); } - private void parseBindingURL() throws URLSyntaxException + private void parseBindingURL() throws URISyntaxException { - try - { - URI connection = new URI(_url); - - String exchangeClass = connection.getScheme(); - - if (exchangeClass == null) - { - _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url; - // URLHelper.parseError(-1, "Exchange Class not specified.", _url); - parseBindingURL(); - - return; - } - else - { - setExchangeClass(exchangeClass); - } - - String exchangeName = connection.getHost(); - - if (exchangeName == null) - { - if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) - { - setExchangeName(""); - } - else - { - throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); - } - } - else - { - setExchangeName(exchangeName); - } - - String queueName; - - if ((connection.getPath() == null) || connection.getPath().equals("")) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination or Queue requried", _url); - } - else - { - int slash = connection.getPath().indexOf("/", 1); - if (slash == -1) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination requried", _url); - } - else - { - String path = connection.getPath(); - setDestinationName(path.substring(1, slash)); - - // We don't set queueName yet as the actual value we use depends on options set - // when we are dealing with durable subscriptions - - queueName = path.substring(slash + 1); - - } - } - - URLHelper.parseOptions(_options, connection.getQuery()); - - processOptions(); - - // We can now call setQueueName as the URL is full parsed. - - setQueueName(queueName); - - // Fragment is #string (not used) - _logger.debug("URL Parsed: " + this); - - } - catch (URISyntaxException uris) - { - - throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); - - } + BindingURLParser parser = new BindingURLParser(_url,this); + processOptions(); + _logger.debug("URL Parsed: " + this); } - private void setExchangeClass(String exchangeClass) + public void setExchangeClass(String exchangeClass) { setExchangeClass(new AMQShortString(exchangeClass)); } - private void setQueueName(String name) throws URLSyntaxException + public void setQueueName(String name) { setQueueName(new AMQShortString(name)); } - private void setDestinationName(String name) + public void setDestinationName(String name) { setDestinationName(new AMQShortString(name)); } - private void setExchangeName(String exchangeName) + public void setExchangeName(String exchangeName) { setExchangeName(new AMQShortString(exchangeName)); } - private void processOptions() + private void processOptions() throws URISyntaxException { - // this is where we would parse any options that needed more than just storage. } public String getURL() @@ -210,34 +130,9 @@ public class AMQBindingURL implements BindingURL return _queueName; } - public void setQueueName(AMQShortString name) throws URLSyntaxException + public void setQueueName(AMQShortString name) { - if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) - { - if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) - { - if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) - { - _queueName = - new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); - } - else - { - throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID - + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url); - - } - } - else - { - _queueName = null; - } - } - else - { - _queueName = name; - } - + _queueName = name; } public String getOption(String key) @@ -261,7 +156,7 @@ public class AMQBindingURL implements BindingURL { if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } else { @@ -271,12 +166,29 @@ public class AMQBindingURL implements BindingURL if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } return getDestinationName(); } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length>0) + { + return _bindingKeys; + } + else + { + return new AMQShortString[]{getRoutingKey()}; + } + } + + public void setBindingKeys(AMQShortString[] keys) + { + _bindingKeys = keys; + } + public void setRoutingKey(AMQShortString key) { setOption(OPTION_ROUTING_KEY, key.toString()); @@ -296,6 +208,29 @@ public class AMQBindingURL implements BindingURL sb.append(URLHelper.printOptions(_options)); - return sb.toString(); + // temp hack + if (getRoutingKey() == null || getRoutingKey().toString().equals("")) + { + + if (sb.toString().indexOf("?") == -1) + { + sb.append("?"); + } + else + { + sb.append("&"); + } + + for (AMQShortString key :_bindingKeys) + { + sb.append(BindingURL.OPTION_BINDING_KEY).append("='").append(key.toString()).append("'&"); + } + + return sb.toString().substring(0,sb.toString().length()-1); + } + else + { + return sb.toString(); + } } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 67be2db86f..25450fea64 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,6 +34,7 @@ public interface BindingURL public static final String OPTION_CLIENTID = "clientid"; public static final String OPTION_SUBSCRIPTION = "subscription"; public static final String OPTION_ROUTING_KEY = "routingkey"; + public static final String OPTION_BINDING_KEY = "bindingkey"; String getURL(); @@ -52,5 +53,7 @@ public interface BindingURL AMQShortString getRoutingKey(); + AMQShortString[] getBindingKeys(); + String toString(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java new file mode 100644 index 0000000000..5d26e7e65b --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -0,0 +1,445 @@ +package org.apache.qpid.url; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BindingURLParser +{ + private static final char PROPERTY_EQUALS_CHAR = '='; + private static final char PROPERTY_SEPARATOR_CHAR = '&'; + private static final char ALTERNATIVE_PROPERTY_SEPARATOR_CHAR = ','; + private static final char FORWARD_SLASH_CHAR = '/'; + private static final char QUESTION_MARK_CHAR = '?'; + private static final char SINGLE_QUOTE_CHAR = '\''; + private static final char COLON_CHAR = ':'; + private static final char END_OF_URL_MARKER_CHAR = '%'; + + private static final Logger _logger = LoggerFactory.getLogger(BindingURLImpl.class); + + private char[] _url; + private AMQBindingURL _bindingURL; + private BindingURLParserState _currentParserState; + private String _error; + private int _index = 0; + private String _currentPropName; + private Map<String,Object> _options = new HashMap<String,Object>(); + + //<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + public BindingURLParser(String url,AMQBindingURL bindingURL) throws URISyntaxException + { + _url = (url + END_OF_URL_MARKER_CHAR).toCharArray(); + _bindingURL = bindingURL; + _currentParserState = BindingURLParserState.BINDING_URL_START; + BindingURLParserState prevState = _currentParserState; + + try + { + while (_currentParserState != BindingURLParserState.ERROR && _currentParserState != BindingURLParserState.BINDING_URL_END) + { + prevState = _currentParserState; + _currentParserState = next(); + } + + if (_currentParserState == BindingURLParserState.ERROR) + { + _error = + "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ") due to " + _error; + _logger.debug(_error); + URISyntaxException ex; + ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + throw ex; + } + + processOptions(); + } + catch (ArrayIndexOutOfBoundsException e) + { + _error = "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ")"; + URISyntaxException ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + ex.initCause(e); + throw ex; + } + } + + enum BindingURLParserState + { + BINDING_URL_START, + EXCHANGE_CLASS, + COLON_CHAR, + DOUBLE_SEP, + EXCHANGE_NAME, + EXCHANGE_SEPERATOR_CHAR, + DESTINATION, + DESTINATION_SEPERATOR_CHAR, + QUEUE_NAME, + QUESTION_MARK_CHAR, + PROPERTY_NAME, + PROPERTY_EQUALS, + START_PROPERTY_VALUE, + PROPERTY_VALUE, + END_PROPERTY_VALUE, + PROPERTY_SEPARATOR, + BINDING_URL_END, + ERROR + } + + /** + * I am fully ware that there are few optimizations + * that can speed up things a wee bit. But I have opted + * for readability and maintainability at the expense of + * speed, as speed is not a critical factor here. + * + * One can understand the full parse logic by just looking at this method. + */ + private BindingURLParserState next() + { + switch (_currentParserState) + { + case BINDING_URL_START: + return extractExchangeClass(); + case COLON_CHAR: + _index++; //skip ":" + return BindingURLParserState.DOUBLE_SEP; + case DOUBLE_SEP: + _index = _index + 2; //skip "//" + return BindingURLParserState.EXCHANGE_NAME; + case EXCHANGE_NAME: + return extractExchangeName(); + case EXCHANGE_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.DESTINATION; + case DESTINATION: + return extractDestination(); + case DESTINATION_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.QUEUE_NAME; + case QUEUE_NAME: + return extractQueueName(); + case QUESTION_MARK_CHAR: + _index++; // skip '?' + return BindingURLParserState.PROPERTY_NAME; + case PROPERTY_NAME: + return extractPropertyName(); + case PROPERTY_EQUALS: + _index++; // skip the equal sign + return BindingURLParserState.START_PROPERTY_VALUE; + case START_PROPERTY_VALUE: + _index++; // skip the '\'' + return BindingURLParserState.PROPERTY_VALUE; + case PROPERTY_VALUE: + return extractPropertyValue(); + case END_PROPERTY_VALUE: + _index ++; + return checkEndOfURL(); + case PROPERTY_SEPARATOR: + _index++; // skip '&' + return BindingURLParserState.PROPERTY_NAME; + default: + return BindingURLParserState.ERROR; + } + } + + private BindingURLParserState extractExchangeClass() + { + char nextChar = _url[_index]; + + // check for the following special cases. + // "myQueue?durable='true'" or just "myQueue"; + + StringBuilder builder = new StringBuilder(); + while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // normal use case + if (nextChar == COLON_CHAR) + { + _bindingURL.setExchangeClass(builder.toString()); + return BindingURLParserState.COLON_CHAR; + } + // "myQueue?durable='true'" use case + else if (nextChar == QUESTION_MARK_CHAR) + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractExchangeName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + _bindingURL.setExchangeName(builder.toString()); + return BindingURLParserState.EXCHANGE_SEPERATOR_CHAR; + } + + private BindingURLParserState extractDestination() + { + char nextChar = _url[_index]; + + //The destination is and queue name are both optional + // This is checking for the case where both are not specified. + if (nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR && nextChar != QUESTION_MARK_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // This is the case where the destination is explictily stated. + // ex direct://amq.direct/myDest/myQueue?option1='1' ... OR + // direct://amq.direct//myQueue?option1='1' ... + if (nextChar == FORWARD_SLASH_CHAR) + { + _bindingURL.setDestinationName(builder.toString()); + return BindingURLParserState.DESTINATION_SEPERATOR_CHAR; + } + // This is the case where destination is not explictly stated. + // ex direct://amq.direct/myQueue?option1='1' ... + else + { + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + } + + private BindingURLParserState extractQueueName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + nextChar = _url[++_index]; + } + _bindingURL.setQueueName(builder.toString()); + + if(nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractPropertyName() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != PROPERTY_EQUALS_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + _currentPropName = builder.toString(); + + if (_currentPropName.trim().equals("")) + { + _error = "Property name cannot be empty"; + return BindingURLParserState.ERROR; + } + + return BindingURLParserState.PROPERTY_EQUALS; + } + + private BindingURLParserState extractPropertyValue() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != SINGLE_QUOTE_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + String propValue = builder.toString(); + + if (propValue.trim().equals("")) + { + _error = "Property values cannot be empty"; + return BindingURLParserState.ERROR; + } + else + { + if (_options.containsKey(_currentPropName)) + { + Object obj = _options.get(_currentPropName); + if (obj instanceof List) + { + List list = (List)obj; + list.add(propValue); + } + else // it has to be a string + { + List<String> list = new ArrayList(); + list.add((String)obj); + list.add(propValue); + _options.put(_currentPropName, list); + } + } + else + { + _options.put(_currentPropName, propValue); + } + + + return BindingURLParserState.END_PROPERTY_VALUE; + } + } + + private BindingURLParserState checkEndOfURL() + { + char nextChar = _url[_index]; + if ( nextChar == END_OF_URL_MARKER_CHAR) + { + return BindingURLParserState.BINDING_URL_END; + } + else if (nextChar == PROPERTY_SEPARATOR_CHAR || nextChar == ALTERNATIVE_PROPERTY_SEPARATOR_CHAR) + { + return BindingURLParserState.PROPERTY_SEPARATOR; + } + else + { + return BindingURLParserState.ERROR; + } + } + + private String markErrorLocation() + { + String tmp = String.valueOf(_url); + // length -1 to remove ENDOF URL marker + return tmp.substring(0,_index) + "^" + tmp.substring(_index+1> tmp.length()-1?tmp.length()-1:_index+1,tmp.length()-1); + } + + private void processOptions() throws URISyntaxException + { +// check for bindingKey + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.get(BindingURL.OPTION_BINDING_KEY) != null) + { + Object obj = _options.get(BindingURL.OPTION_BINDING_KEY); + + if (obj instanceof String) + { + AMQShortString[] bindingKeys = new AMQShortString[]{new AMQShortString((String)obj)}; + _bindingURL.setBindingKeys(bindingKeys); + } + else // it would be a list + { + List list = (List)obj; + AMQShortString[] bindingKeys = new AMQShortString[list.size()]; + int i=0; + for (Iterator it = list.iterator(); it.hasNext();) + { + bindingKeys[i] = new AMQShortString((String)it.next()); + i++; + } + _bindingURL.setBindingKeys(bindingKeys); + } + + } + for (String key: _options.keySet()) + { + // We want to skip the bindingKey list + if (_options.get(key) instanceof String) + { + _bindingURL.setOption(key, (String)_options.get(key)); + } + } + + + // check if both a binding key and a routing key is specified. + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.containsKey(BindingURL.OPTION_ROUTING_KEY)) + { + throw new URISyntaxException(String.valueOf(_url),"It is illegal to specify both a routingKey and a bindingKey in the same URL",-1); + } + + // check for durable subscriptions + if (_bindingURL.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + String queueName = null; + if (Boolean.parseBoolean(_bindingURL.getOption(BindingURL.OPTION_DURABLE))) + { + if (_bindingURL.containsOption(BindingURL.OPTION_CLIENTID) && _bindingURL.containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + queueName = _bindingURL.getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + throw new URISyntaxException(String.valueOf(_url),"Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + + " and " + BindingURL.OPTION_SUBSCRIPTION , -1); + + } + } + _bindingURL.setQueueName(queueName); + } + } + + public static void main(String[] args) + { + String[] urls = new String[] + { + "topic://amq.topic//myTopic?routingkey='stocks.#'", + "topic://amq.topic/message_queue?bindingkey='usa.*'&bindingkey='control',exclusive='true'", + "topic://amq.topic//?bindingKey='usa.*',bindingkey='control',exclusive='true'", + "direct://amq.direct/dummyDest/myQueue?routingkey='abc.*'", + "exchange.Class://exchangeName/Destination/Queue", + "exchangeClass://exchangeName/Destination/?option='value',option2='value2'", + "IBMPerfQueue1?durable='true'", + "exchangeClass://exchangeName/Destination/?bindingkey='key1',bindingkey='key2'", + "exchangeClass://exchangeName/Destination/?bindingkey='key1'&routingkey='key2'" + }; + + try + { + for (String url: urls) + { + System.out.println("URL " + url); + AMQBindingURL bindingURL = new AMQBindingURL(url); + BindingURLParser parser = new BindingURLParser(url,bindingURL); + System.out.println("\nX " + bindingURL.toString() + " \n"); + + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 1da06ab92b..b5f2efdf01 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -158,6 +158,7 @@ </target> <property name="test" value="*Test"/> + <property name="test.mem" value="512M"/> <property name="log" value="info"/> <property name="amqj.logging.level" value="${log}"/> @@ -190,6 +191,8 @@ description="execute unit tests"> <junit fork="yes" haltonfailure="no" printsummary="on" timeout="600000"> + <jvmarg value="-Xmx${test.mem}" /> + <sysproperty key="amqj.logging.level" value="${amqj.logging.level}"/> <sysproperty key="root.logging.level" value="${root.logging.level}"/> <sysproperty key="log4j.configuration" value="${log4j.configuration}"/> diff --git a/qpid/python/mllib/dom.py b/qpid/python/mllib/dom.py index 10b19d6db1..7c759dbdd5 100644 --- a/qpid/python/mllib/dom.py +++ b/qpid/python/mllib/dom.py @@ -72,7 +72,7 @@ class Dispatcher: cls = cls.base return False - def dispatch(self, f): + def dispatch(self, f, attrs = ""): cls = self while cls != None: if hasattr(f, cls.type): @@ -81,7 +81,6 @@ class Dispatcher: cls = cls.base cls = self - attrs = "" while cls != None: if attrs: sep = ", " @@ -151,9 +150,10 @@ class Tag(Node): def dispatch(self, f): try: - method = getattr(f, "do_" + self.name) + attr = "do_" + self.name + method = getattr(f, attr) except AttributeError: - return Dispatcher.dispatch(self, f) + return Dispatcher.dispatch(self, f, "'%s'" % attr) return method(self) class Leaf(Component, Dispatcher): diff --git a/qpid/python/qpid/queue.py b/qpid/python/qpid/queue.py index af0565b6cc..00946a9156 100644 --- a/qpid/python/qpid/queue.py +++ b/qpid/python/qpid/queue.py @@ -24,36 +24,51 @@ content of a queue can be notified if the queue is no longer in use. """ from Queue import Queue as BaseQueue, Empty, Full +from threading import Thread class Closed(Exception): pass class Queue(BaseQueue): END = object() + STOP = object() def __init__(self, *args, **kwargs): BaseQueue.__init__(self, *args, **kwargs) - self._real_put = self.put - self.listener = self._real_put + self.listener = None + self.thread = None def close(self): self.put(Queue.END) def get(self, block = True, timeout = None): - self.put = self._real_put - try: - result = BaseQueue.get(self, block, timeout) - if result == Queue.END: - # this guarantees that any other waiting threads or any future - # calls to get will also result in a Closed exception - self.put(Queue.END) - raise Closed() - else: - return result - finally: - self.put = self.listener - pass + result = BaseQueue.get(self, block, timeout) + if result == Queue.END: + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Closed exception + self.put(Queue.END) + raise Closed() + else: + return result def listen(self, listener): self.listener = listener - self.put = self.listener + if listener == None: + if self.thread != None: + self.put(Queue.STOP) + self.thread.join() + self.thread = None + else: + if self.thread == None: + self.thread = Thread(target = self.run) + self.thread.setDaemon(True) + self.thread.start() + + def run(self): + while True: + try: + o = self.get() + if o == Queue.STOP: break + self.listener(o) + except Closed: + break diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py index d2e495d207..e12354eb43 100644 --- a/qpid/python/tests/queue.py +++ b/qpid/python/tests/queue.py @@ -30,37 +30,32 @@ class QueueTest (TestCase): # all the queue functionality. def test_listen(self): - LISTEN = object() - GET = object() - EMPTY = object() + values = [] + heard = threading.Event() + def listener(x): + values.append(x) + heard.set() q = Queue(0) - values = [] - q.listen(lambda x: values.append((LISTEN, x))) + q.listen(listener) + heard.clear() q.put(1) - assert values[-1] == (LISTEN, 1) + heard.wait() + assert values[-1] == 1 + heard.clear() q.put(2) - assert values[-1] == (LISTEN, 2) - - class Getter(threading.Thread): + heard.wait() + assert values[-1] == 2 - def run(self): - try: - values.append((GET, q.get(timeout=10))) - except Empty: - values.append(EMPTY) - - g = Getter() - g.start() - # let the other thread reach the get - time.sleep(2) + q.listen(None) q.put(3) - g.join() - - assert values[-1] == (GET, 3) + assert q.get(3) == 3 + q.listen(listener) + heard.clear() q.put(4) - assert values[-1] == (LISTEN, 4) + heard.wait() + assert values[-1] == 4 def test_close(self): q = Queue(0) |
