diff options
Diffstat (limited to 'qpid/cpp')
55 files changed, 1391 insertions, 232 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index 3d68ed7890..c17051d431 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -122,11 +122,11 @@ test -n "$RUBY" && generate=yes test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").]) specdir=`pwd`/$srcdir/../specs -AMQP_XML=$specdir/amqp.0-10-preview.xml -AC_SUBST(AMQP_XML) -ls $AMQP_XML >/dev/null 2>&1 || generate=no - -AM_CONDITIONAL([GENERATE], [test x$generate = xyes]) +AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml +AMQP_FINAL_XML=$specdir/amqp.0-10.xml +AC_SUBST(AMQP_PREVIEW_XML) +AC_SUBST(AMQP_FINAL_XML) +AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null]) # URL and download URL for the package. URL=http://rhm.et.redhat.com/qpidc @@ -139,7 +139,6 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],, AC_MSG_ERROR([Missing required header files.])) # Check for optional CPG requirement. -save_ldflags=$LDFLAGS LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais" AC_ARG_WITH([cpg], @@ -157,15 +156,13 @@ AC_ARG_WITH([cpg], esac], [ # not specified - enable if libs/headers available. with_CPG=yes - AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no]) + AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no]) ] ) AM_CONDITIONAL([CPG], [test x$with_CPG = xyes]) if test x$with_CPG = xyes; then CPPFLAGS+=" -DCPG" -else - LDFLAGS=$save_ldflags fi # Files to generate diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in index 167247f67b..fd4a24b2e5 100644 --- a/qpid/cpp/qpidc.spec.in +++ b/qpid/cpp/qpidc.spec.in @@ -104,6 +104,7 @@ make check %files devel %defattr(-,root,root,-) %_includedir/qpid/*.h +%_includedir/qpid/amqp_0_10 %_includedir/qpid/client %_includedir/qpid/framing %_includedir/qpid/sys diff --git a/qpid/cpp/rubygen/0-10/specification.rb b/qpid/cpp/rubygen/0-10/specification.rb new file mode 100755 index 0000000000..026b49e1a9 --- /dev/null +++ b/qpid/cpp/rubygen/0-10/specification.rb @@ -0,0 +1,177 @@ +#!/usr/bin/env ruby +$: << ".." # Include .. in load path +require 'cppgen' + +class Specification < CppGen + def initialize(outdir, amqp) + super(outdir, amqp) + @ns="qpid::amqp_#{@amqp.version.bars}" + @dir="qpid/amqp_#{@amqp.version.bars}" + end + + # domains + + def domain_h(d) + genl + typename=d.name.typename + if d.enum + scope("enum #{typename} {", "};") { + genl d.enum.choices.map { |c| + "#{c.name.constname} = #{c.value}" }.join(",\n") + } + elsif (d.type_ == "array") + genl "typedef Array<#{ArrayTypes[d.name].amqp2cpp}> #{typename};" + else + genl "typedef #{d.type_.amqp2cpp} #{typename};" + end + end + + # class constants + + def class_h(c) + genl "const uint8_t CODE=#{c.code};" + genl "extern const char* NAME;" + end + + def class_cpp(c) + genl "const char* NAME=\"#{c.fqname}\";" + end + + # Used by structs, commands and controls. + def action_struct_h(x, base, consts, &block) + genl + struct(x.classname, "public #{base}") { + x.fields.each { |f| genl "#{f.type_.amqp2cpp} #{f.cppname};" } + genl + genl "static const char* NAME;" + consts.each { |c| genl "static const uint8_t #{c.upcase}=#{x.send c or 0};"} + genl "static const uint8_t CLASS_CODE=#{x.containing_class.nsname}::CODE;" + genl + genl "#{x.classname}();" + scope("#{x.classname}(",");") { genl x.parameters } unless x.fields.empty? + genl + genl "void accept(Visitor&) const;" + genl + yield if block + } + end + + def action_struct_cpp(x) + genl + genl "const char* #{x.classname}::NAME=\"#{x.fqname}\";" + genl + genl "#{x.classname}::#{x.classname}() {}"; + genl + if not x.fields.empty? + scope("#{x.classname}::#{x.classname}(",") :") { genl x.parameters } + indent() { genl x.initializers } + genl "{}" + genl + end + scope("void #{x.classname}::accept(Visitor&) const {","}") { + genl "// FIXME aconway 2008-02-27: todo" + } + end + + # structs + + def struct_h(s) action_struct_h(s, "Struct", ["size","pack","code"]); end + def struct_cpp(s) action_struct_cpp(s) end + + # command and control + + def action_h(a) + action_struct_h(a, a.base, ["code"]) { + scope("template <class T> void invoke(T& target) {","}") { + scope("target.#{a.funcname}(", ");") { genl a.values } + } + genl + scope("template <class S> void serialize(S& s) {","}") { + gen "s" + a.fields.each { |f| gen "(#{f.cppname})"} + genl ";" + } unless a.fields.empty? + } + end + + def action_cpp(a) action_struct_cpp(a); end + + # Types that must be generated early because they are used by other types. + def pregenerate?(x) not @amqp.used_by[x.fqname].empty?; end + + # Generate the log + def gen_specification() + h_file("#{@dir}/specification") { + include "#{@dir}/built_in_types" + include "#{@dir}/helpers" + include "<boost/call_traits.hpp>" + genl "using boost::call_traits;" + namespace(@ns) { + # Top level + @amqp.domains.each { |d| + # segment-type and track are are built in + domain_h d unless ["track","segment-type"].include?(d.name) + } + puts @amqp.used_by.inspect + + # Domains and structs that must be generated early because + # they are used by other definitions: + each_class_ns { |c| + class_h c + c.domains.each { |d| domain_h d if pregenerate? d } + c.structs.each { |s| struct_h s if pregenerate? s } + } + # Now dependent domains/structs and actions + each_class_ns { |c| + c.domains.each { |d| domain_h d if not pregenerate? d } + c.structs.each { |s| struct_h s if not pregenerate? s } + c.actions.each { |a| action_h a } + } + } + } + + cpp_file("#{@dir}/specification") { + include "#{@dir}/specification" + namespace(@ns) { + each_class_ns { |c| + class_cpp c + c.actions.each { |a| action_cpp a} + c.structs.each { |s| struct_cpp s } + } + } + } + end + + def gen_proxy() + h_file("#{@dir}/Proxy.h") { + include "#{@dir}/specification" + namespace(@ns) { + genl "template <class F, class R=F::result_type>" + cpp_class("ProxyTemplate") { + public + genl "ProxyTemplate(F f) : functor(f) {}" + @amqp.classes.each { |c| + c.actions.each { |a| + scope("R #{a.funcname}(", ")") { genl a.parameters } + scope() { + var=a.name.funcname + scope("#{a.classname} #{var}(",");") { genl a.arguments } + genl "return functor(#{var});" + } + } + } + private + genl "F functor;" + } + } + } + end + + def generate + gen_specification + gen_proxy + end +end + +Specification.new($outdir, $amqp).generate(); + diff --git a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb index 18f74a2e41..f9ef95f5a0 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb @@ -23,5 +23,5 @@ class MethodBodyConstVisitorGen < CppGen end end -MethodBodyConstVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyConstVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb index 4944b10a84..a74b0c06d6 100755 --- a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb @@ -31,5 +31,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/MethodHolder.rb b/qpid/cpp/rubygen/99-0/MethodHolder.rb index 7e915677b2..a708db6676 100755 --- a/qpid/cpp/rubygen/templates/MethodHolder.rb +++ b/qpid/cpp/rubygen/99-0/MethodHolder.rb @@ -96,5 +96,5 @@ EOS end end -MethodHolderGen.new(Outdir, Amqp).generate(); +MethodHolderGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/99-0/Operations.rb index 91007ef3e1..c985bb6105 100755 --- a/qpid/cpp/rubygen/templates/Operations.rb +++ b/qpid/cpp/rubygen/99-0/Operations.rb @@ -91,6 +91,6 @@ EOS end end -OperationsGen.new("client",ARGV[0], Amqp).generate() -OperationsGen.new("server",ARGV[0], Amqp).generate() +OperationsGen.new("client",ARGV[0], $amqp).generate() +OperationsGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/OperationsInvoker.rb b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb index 747dd06189..642f98ce8e 100755 --- a/qpid/cpp/rubygen/templates/OperationsInvoker.rb +++ b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb @@ -88,5 +88,5 @@ class OperationsInvokerGen < CppGen end end -OperationsInvokerGen.new("client",ARGV[0], Amqp).generate() -OperationsInvokerGen.new("server",ARGV[0], Amqp).generate() +OperationsInvokerGen.new("client",ARGV[0], $amqp).generate() +OperationsInvokerGen.new("server",ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb index 467476506c..2829884673 100755 --- a/qpid/cpp/rubygen/templates/Proxy.rb +++ b/qpid/cpp/rubygen/99-0/Proxy.rb @@ -62,7 +62,9 @@ EOS include "<sstream>" include "#{@classname}.h" include "qpid/framing/amqp_types_full.h" - Amqp.methods_on(@chassis).each { |m| include "qpid/framing/"+m.body_name } + @amqp.methods_on(@chassis).each { + |m| include "qpid/framing/"+m.body_name + } genl namespace("qpid::framing") { genl "#{@classname}::#{@classname}(FrameHandler& f) :" @@ -75,6 +77,6 @@ EOS end -ProxyGen.new("client", Outdir, Amqp).generate; -ProxyGen.new("server", Outdir, Amqp).generate; +ProxyGen.new("client", $outdir, $amqp).generate; +ProxyGen.new("server", $outdir, $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb index 6a50fdb462..e01a28a62d 100644 --- a/qpid/cpp/rubygen/templates/Session.rb +++ b/qpid/cpp/rubygen/99-0/Session.rb @@ -190,6 +190,6 @@ EOS end end -SessionNoKeywordGen.new(ARGV[0], Amqp).generate() -SessionGen.new(ARGV[0], Amqp).generate() +SessionNoKeywordGen.new(ARGV[0], $amqp).generate() +SessionGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/templates/all_method_bodies.rb b/qpid/cpp/rubygen/99-0/all_method_bodies.rb index d06f459493..5971d49189 100755 --- a/qpid/cpp/rubygen/templates/all_method_bodies.rb +++ b/qpid/cpp/rubygen/99-0/all_method_bodies.rb @@ -17,5 +17,5 @@ class AllMethodBodiesGen < CppGen end end -AllMethodBodiesGen.new(Outdir, Amqp).generate(); +AllMethodBodiesGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/constants.rb b/qpid/cpp/rubygen/99-0/constants.rb index 5fbbefe218..b5f559d504 100755 --- a/qpid/cpp/rubygen/templates/constants.rb +++ b/qpid/cpp/rubygen/99-0/constants.rb @@ -78,5 +78,5 @@ class ConstantsGen < CppGen end end -ConstantsGen.new(Outdir, Amqp).generate(); +ConstantsGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/templates/frame_body_lists.rb b/qpid/cpp/rubygen/99-0/frame_body_lists.rb index 634001ab14..b20e4550f3 100644 --- a/qpid/cpp/rubygen/templates/frame_body_lists.rb +++ b/qpid/cpp/rubygen/99-0/frame_body_lists.rb @@ -26,6 +26,6 @@ EOS end end -FrameBodyListsGen.new(ARGV[0], Amqp).generate; +FrameBodyListsGen.new(ARGV[0], $amqp).generate; diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb index edbadb01f6..336591be00 100644 --- a/qpid/cpp/rubygen/templates/structs.rb +++ b/qpid/cpp/rubygen/99-0/structs.rb @@ -534,5 +534,5 @@ EOS end end -StructGen.new(ARGV[0], Amqp).generate() +StructGen.new(ARGV[0], $amqp).generate() diff --git a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb index d1212153ca..1fff1d51db 100755 --- a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb +++ b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb @@ -30,5 +30,5 @@ class MethodBodyDefaultVisitorGen < CppGen end end -MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate(); +MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate(); diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb index f998909ec8..b1e635a27b 100755 --- a/qpid/cpp/rubygen/amqpgen.rb +++ b/qpid/cpp/rubygen/amqpgen.rb @@ -119,7 +119,16 @@ class AmqpElement # List of children of type elname, or all children if elname # not specified. def children(elname=nil) - @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + if elname + @cache_children[elname] ||= @children.select { |c| elname==c.xml.name } + else + @children + end + end + + def each_descendant(&block) + yield self + @children.each { |c| c.each_descendant(&block) } end # Look up child of type elname with attribute name. @@ -127,6 +136,9 @@ class AmqpElement @cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name } end + # Fully qualified amqp dotted name of this element + def dotted_name() (parent ? parent.dotted_name+"." : "") + name; end + # The root <amqp> element. def root() @root ||=parent ? parent.root : self; end @@ -140,9 +152,22 @@ class AmqpElement # Text of doc child if there is one. def doc() d=xml.elements["doc"]; d and d.text; end + def fqname() + throw "fqname: #{self} #{parent.fqname} has no name" unless name + p=parent && parent.fqname + p ? p+"."+name : name; + end + + def containing_class() + return self if is_a? AmqpClass + return parent && parent.containing_class + end + end -AmqpResponse = AmqpElement +class AmqpResponse < AmqpElement + def initialize(xml, parent) super; end +end class AmqpDoc < AmqpElement def initialize(xml,parent) super; end @@ -153,18 +178,32 @@ class AmqpChoice < AmqpElement def initialize(xml,parent) super; end amqp_attr_reader :name, :value end - + class AmqpEnum < AmqpElement def initialize(xml,parent) super; end amqp_child_reader :choice end +# 0-10 array domains are missing element type information, add it here. +ArrayTypes={ + "str16-array" => "str-16", + "amqp-host-array" => "connection.amqp-host-url", + "command-fragments" => "session.command-fragment", + "in-doubt" => "dtx.xid" +} + class AmqpDomain < AmqpElement - def initialize(xml, parent) super; end + def initialize(xml, parent) + super + root.used_by[uses].push(fqname) if uses and uses.index('.') + end + amqp_attr_reader :type amqp_single_child_reader :struct # preview amqp_single_child_reader :enum + def uses() type_=="array" ? ArrayTypes[name] : type_; end + def unalias() d=self while (d.type_ != d.name and root.domain(d.type_)) @@ -180,10 +219,13 @@ class AmqpException < AmqpElement end class AmqpField < AmqpElement - def initialize(xml, amqp) super; end; + def initialize(xml, amqp) + super; + root.used_by[type_].push(parent.fqname) if type_ and type_.index('.') + end + def domain() root.domain(xml.attributes["domain"]); end amqp_single_child_reader :struct # preview - # FIXME aconway 2008-02-21: exceptions in fields - need to update c++ mapping. amqp_child_reader :exception amqp_attr_reader :type, :default, :code, :required end @@ -198,15 +240,11 @@ class AmqpConstant < AmqpElement amqp_attr_reader :value, :class end -# FIXME aconway 2008-02-21: -# class AmqpResponse < AmqpElement -# def initialize(xml, parent) super; end -# end - class AmqpResult < AmqpElement def initialize(xml, parent) super; end amqp_single_child_reader :struct # preview amqp_attr_reader :type + def name() "result"; end end class AmqpEntry < AmqpElement @@ -236,8 +274,8 @@ class AmqpStruct < AmqpElement amqp_attr_reader :size, :code, :pack amqp_child_reader :field - alias :raw_pack :pack # preview - preview code needs default "short" for pack. + alias :raw_pack :pack def pack() raw_pack or (not parent.final? and "short"); end def result?() parent.xml.name == "result"; end def domain?() parent.xml.name == "domain"; end @@ -265,17 +303,21 @@ class AmqpRole < AmqpElement amqp_attr_reader :implement end -class AmqpControl < AmqpElement +# Base class for command and control. +class AmqpAction < AmqpElement def initialize(xml,amqp) super; end amqp_child_reader :implement, :field, :response amqp_attr_reader :code end -class AmqpCommand < AmqpElement +class AmqpControl < AmqpAction + def initialize(xml,amqp) super; end +end + +class AmqpCommand < AmqpAction def initialize(xml,amqp) super; end - amqp_child_reader :implement, :field, :exception, :response + amqp_child_reader :exception amqp_single_child_reader :result, :segments - amqp_attr_reader :code end class AmqpClass < AmqpElement @@ -296,12 +338,19 @@ class AmqpClass < AmqpElement def l4?() # preview !["connection", "session", "execution"].include?(name) end + + def actions() controls+commands; end end class AmqpType < AmqpElement + def initialize(xml,amqp) super; end amqp_attr_reader :code, :fixed_width, :variable_width end +class AmqpXref < AmqpElement + def initialize(xml,amqp) super; end +end + # AMQP root element. class AmqpRoot < AmqpElement amqp_attr_reader :major, :minor, :port, :comment @@ -314,14 +363,16 @@ class AmqpRoot < AmqpElement raise "No XML spec files." if specs.empty? xml=parse(specs.shift) specs.each { |s| xml_merge(xml, parse(s)) } + @used_by=Hash.new{ |h,k| h[k]=[] } super(xml, nil) end + attr_reader :used_by + + def merge(root) xml_merge(xml, root.xml); end + def version() major + "-" + minor; end - # Find a child node from a dotted amqp name, e.g. message.transfer - def lookup(dotted_name) elements[dotted_name.gsub(/\./,"/")]; end - # preview - only struct child reader remains for new mapping def domain_structs() domains.map{ |d| d.struct }.compact; end def result_structs() @@ -337,6 +388,8 @@ class AmqpRoot < AmqpElement @methods_on[chassis] ||= classes.map { |c| c.methods_on(chassis) }.flatten end + def fqname() nil; end + # TODO aconway 2008-02-21: methods by role. private @@ -353,7 +406,7 @@ end # Collect information about generated files. class GenFiles @@files =[] - def GenFiles.add(f) @@files << f; puts f; end + def GenFiles.add(f) @@files << f; end def GenFiles.get() @@files; end end @@ -366,26 +419,38 @@ class Generator def initialize (outdir, amqp) @amqp=amqp @outdir=outdir - @prefix='' # For indentation or comments. + @prefix=[''] # For indentation or comments. @indentstr=' ' # One indent level. @outdent=2 - Pathname.new(@outdir).mkpath unless @outdir=="-" or File.directory?(@outdir) + Pathname.new(@outdir).mkpath unless @outdir=="-" end # Create a new file, set @out. - def file(file) + def file(file, &block) GenFiles.add file if (@outdir != "-") - path=Pathname.new "#{@outdir}/#{file}" - path.parent.mkpath - path.open('w') { |@out| yield } + @path=Pathname.new "#{@outdir}/#{file}" + @path.parent.mkpath + @out=String.new # Generate in memory first + if block then yield; endfile; end + end + end + + def endfile() + if @outdir != "-" + if @path.exist? and @path.read == @out + puts "Skipped #{@path} - unchanged" # Dont generate if unchanged + else + @path.open('w') { |f| f << @out } + puts "Generated #{@path}" + end end end # Append multi-line string to generated code, prefixing each line. def gen (str) str.each_line { |line| - @out << @prefix unless @midline + @out << @prefix.last unless @midline @out << line @midline = nil } @@ -400,23 +465,25 @@ class Generator end # Generate code with added prefix. - def prefix(add) - save=@prefix - @prefix+=add - yield - @prefix=save + def prefix(add, &block) + @prefix.push @prefix.last+add + if block then yield; endprefix; end + end + + def endprefix() + @prefix.pop end # Generate indented code def indent(n=1,&block) prefix(@indentstr * n,&block); end + alias :endindent :endprefix # Generate outdented code def outdent(&block) - save=@prefix - @prefix=@prefix[0...-2] - yield - @prefix=save + @prefix.push @prefix.last[0...-2] + if block then yield; endprefix; end end + alias :endoutdent :endprefix attr_accessor :out end diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 60a653e18d..edee72e0bd 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -54,12 +54,31 @@ CppKeywords = Set.new(["and", "and_eq", "asm", "auto", "bitand", CppMangle = CppKeywords+Set.new(["string"]) class String - def cppsafe() - CppMangle.include?(self) ? self+"_" : self + def cppsafe() CppMangle.include?(self) ? self+"_" : self; end + + def amqp2cpp() + path=split(".") + name=path.pop + return name.typename if path.empty? + path.map! { |n| n.nsname } + return (path << name.caps).join("::") end + + alias :typename :caps + alias :nsname :bars + alias :constname :shout + alias :funcname :lcaps + alias :varname :lcaps end # Hold information about a C++ type. +# +# preview - new mapping does not use CppType, +# Each amqp type corresponds exactly by dotted name +# to a type, domain or struct, which in turns +# corresponds by name to a C++ type or typedef. +# (see String.amqp2cpp) +# class CppType def initialize(name) @name=@param=@ret=name; end attr_reader :name, :param, :ret, :code @@ -93,11 +112,22 @@ class CppType def to_s() name; end; end +class AmqpElement + def cppfqname() + names=parent.dotted_name.split(".") + # Field children are moved up to method in C++b + prefix.pop if parent.is_a? AmqpField + prefix.push cppname + prefix.join("::") + end +end + class AmqpField def cppname() name.lcaps.cppsafe; end def cpptype() domain.cpptype; end def bit?() domain.type_ == "bit"; end def signature() cpptype.param+" "+cppname; end + def paramtype() "call_traits<#{type_.amqp2cpp}>::param_type"; end end class AmqpMethod @@ -107,8 +137,41 @@ class AmqpMethod def body_name() parent.name.caps+name.caps+"Body"; end end +module AmqpHasFields + def parameters() + fields.map { |f| "#{f.paramtype} #{f.cppname}_"}.join(",\n") + end + + def arguments() + fields.map { |f| "#{f.cppname}_"}.join(",\n") + end + + def values() + fields.map { |f| "#{f.cppname}"}.join(",\n") + end + + def initializers() + fields.map { |f| "#{f.cppname}(#{f.cppname}_)"}.join(",\n") + end +end + +class AmqpAction + def classname() name.typename; end + def funcname() parent.name.funcname + name.caps; end + include AmqpHasFields +end + +class AmqpCommand < AmqpAction + def base() "Command"; end +end + +class AmqpControl < AmqpAction + def base() "Control"; end +end + class AmqpClass - def cppname() name.caps; end + def cppname() name.caps; end # preview + def nsname() name.nsname; end end class AmqpDomain @@ -150,18 +213,26 @@ class AmqpResult end class AmqpStruct - def cpp_pack_type() AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); end - def cpptype() parent.cpptype; end - def cppname() cpptype.name; end + include AmqpHasFields + + def cpp_pack_type() # preview + AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); + end + def cpptype() parent.cpptype; end # preview + def cppname() cpptype.name; end # preview + + def classname() name.typename; end end class CppGen < Generator def initialize(outdir, *specs) super(outdir,*specs) + # need to sort classes for dependencies + @actions=[] # Stack of end-scope actions end # Write a header file. - def h_file(path) + def h_file(path, &block) path = (/\.h$/ === path ? path : path+".h") guard=path.upcase.tr('./-','_') file(path) { @@ -169,12 +240,12 @@ class CppGen < Generator gen "#define #{guard}\n" gen Copyright yield - gen "#endif /*!#{guard}*/\n" + gen "#endif /*!#{guard}*/\n" } end # Write a .cpp file. - def cpp_file(path) + def cpp_file(path, &block) path = (/\.cpp$/ === path ? path : path+".cpp") file(path) do gen Copyright @@ -188,10 +259,12 @@ class CppGen < Generator genl "#include #{header}" end - def scope(open="{",close="}", &block) - genl open; indent(&block); genl close + def scope(open="{",close="}", &block) + genl open + indent &block + genl close end - + def namespace(name, &block) genl names = name.split("::") @@ -210,7 +283,7 @@ class CppGen < Generator indent { gen "#{bases.join(",\n")}" } end genl - scope("{","};") { yield } + scope("{","};", &block) end def struct(name, *bases, &block) @@ -242,6 +315,11 @@ class CppGen < Generator prefix(" * ",&block) genl " */" end + + # Generate code in namespace for each class + def each_class_ns() + @amqp.classes.each { |c| namespace(c.nsname) { yield c } } + end end # Fully-qualified class name diff --git a/qpid/cpp/rubygen/generate b/qpid/cpp/rubygen/generate index 94b194aaa4..d094be4f41 100755 --- a/qpid/cpp/rubygen/generate +++ b/qpid/cpp/rubygen/generate @@ -1,5 +1,6 @@ #!/usr/bin/env ruby require 'amqpgen' +require 'pathname' # # Run a set of code generation templates. @@ -18,17 +19,39 @@ EOS exit 1 end -Outdir=ARGV[0] -Specs=ARGV.grep(/\.xml$/) -Amqp=AmqpRoot.new(*Specs) +# Create array of specs by version +def parse_specs(specs) + roots={ } + specs.each { |spec| + root=AmqpRoot.new(spec) + ver=root.version + if (roots[ver]) + roots[ver].merge(root) + else + roots[ver]=root + end + } + roots +end # Run selected templates if ARGV.any? { |arg| arg=="all" } - templates=Dir["#{File.dirname __FILE__}/templates/*.rb"] + templates=Dir["#{File.dirname __FILE__}/*/*.rb"] else templates=ARGV.grep(/\.rb$/) end -templates.each { |t| load t } + +$outdir=ARGV[0] +$models=parse_specs(ARGV.grep(/\.xml$/)) +templates.each { |t| + ver=Pathname.new(t).dirname.basename.to_s + $amqp=$models[ver] + if $amqp + load t + else + puts "Warning: skipping #{t}, no spec file for version #{ver}." + end +} def make_continue(lines) lines.join(" \\\n "); end @@ -40,7 +63,7 @@ if makefile generator_files=Dir["**/*.rb"] << File.basename(__FILE__) Dir.chdir dir rgen_generator=generator_files.map{ |f| "$(rgen_dir)/#{f}" } - rgen_srcs=GenFiles.get.map{ |f| "#{Outdir}/#{f}" } + rgen_srcs=GenFiles.get.map{ |f| "#{$outdir}/#{f}" } File.open(makefile, 'w') { |out| out << <<EOS @@ -51,13 +74,13 @@ rgen_generator=#{make_continue rgen_generator} rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))} -rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))} +rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r{qpid/(framing|amqp_.+)/.+\.cpp$}))} rgen_srcs=#{make_continue rgen_srcs} # Header file install rules. EOS - ["framing", "client/no_keyword","client", "broker"].each { |ns| + ["amqp_0_10", "framing", "client/no_keyword","client", "broker"].each { |ns| dir="qpid/#{ns}" dir_ = dir.tr("/", "_") regex=%r|#{dir}/[^/]+\.h$| @@ -69,7 +92,7 @@ EOS } out << <<EOS if GENERATE -$(rgen_srcs) $(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) +$(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs) $(rgen_cmd) # Empty rule in case a generator file is renamed/removed. diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index b44aaa7e5e..e3b95e045f 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -11,11 +11,13 @@ EXTRA_DIST= $(platform_dist) # This phony target is needed by generated makefile fragments: force: -# AMQP_XML is defined in ../configure.ac -specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml - if GENERATE +# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac +amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml +amqp_0_10_xml=@AMQP_FINAL_XML@ +specs=$(amqp_99_0_xml) $(amqp_0_10_xml) + # Ruby generator. rgen_dir=$(top_srcdir)/rubygen rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk @@ -101,6 +103,7 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_common_cpp) \ $(platform_src) \ + qpid/amqp_0_10/helpers.cpp \ qpid/Serializer.h \ qpid/amqp_0_10/built_in_types.h \ qpid/amqp_0_10/Codec.h \ @@ -166,6 +169,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PreviewConnection.cpp \ qpid/broker/PreviewConnectionHandler.cpp \ qpid/broker/PreviewSessionHandler.cpp \ + qpid/broker/PreviewSessionManager.cpp \ + qpid/broker/PreviewSessionState.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -247,6 +252,7 @@ libqpidclient_la_SOURCES = \ nobase_include_HEADERS = \ $(platform_hdr) \ + qpid/amqp_0_10/helpers.h \ qpid/assert.h \ qpid/DataDir.h \ qpid/Exception.h \ @@ -270,6 +276,8 @@ nobase_include_HEADERS = \ qpid/broker/PreviewConnection.h \ qpid/broker/PreviewConnectionHandler.h \ qpid/broker/PreviewSessionHandler.h \ + qpid/broker/PreviewSessionManager.h \ + qpid/broker/PreviewSessionState.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index 07f157cfc3..17f0d5029c 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -32,20 +32,31 @@ std::string strError(int err) { return std::string(strerror_r(err, buf, sizeof(buf))); } -Exception::Exception(const std::string& s) throw() : msg(s) { - QPID_LOG(warning, "Exception: " << msg); +Exception::Exception(const std::string& msg, + const std::string& nm, + uint16_t cd) throw() + : message(msg), name(nm), code(cd), + whatStr((name.empty() ? "" : name + ": ")+ msg) +{ + QPID_LOG(warning, "Exception: " << whatStr); } Exception::~Exception() throw() {} -std::string Exception::str() const throw() { - if (msg.empty()) - const_cast<std::string&>(msg).assign(typeid(*this).name()); - return msg; +std::string Exception::getMessage() const throw() { return message; } + +std::string Exception::getName() const throw() { + return name.empty() ? typeid(*this).name() : name; } -const char* Exception::what() const throw() { return str().c_str(); } +uint16_t Exception::getCode() const throw() { return code; } + +const char* Exception::what() const throw() { + if (whatStr.empty()) return typeid(*this).name(); + else return whatStr.c_str(); +} -const std::string ClosedException::CLOSED_MESSAGE("Closed"); +ClosedException::ClosedException(const std::string& msg) + : Exception(msg, "ClosedException") {} } // namespace qpid diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h index 57c7a21234..00365018ba 100644 --- a/qpid/cpp/src/qpid/Exception.h +++ b/qpid/cpp/src/qpid/Exception.h @@ -40,13 +40,27 @@ std::string strError(int err); class Exception : public std::exception { public: - explicit Exception(const std::string& str=std::string()) throw(); + explicit Exception(const std::string& message=std::string(), + const std::string& name=std::string(), + uint16_t code=0) throw(); + virtual ~Exception() throw(); + + // returns "name: message" + virtual const char* what() const throw(); + + virtual std::string getName() const throw(); + virtual std::string getMessage() const throw(); + virtual uint16_t getCode() const throw(); + + // FIXME aconway 2008-02-21: backwards compat, remove? + std::string str() const throw() { return getMessage(); } - virtual const char *what() const throw(); - virtual std::string str() const throw(); private: - std::string msg; + const std::string message; + const std::string name; + const uint16_t code; + const std::string whatStr; }; struct ChannelException : public Exception { @@ -62,8 +76,7 @@ struct ConnectionException : public Exception { }; struct ClosedException : public Exception { - static const std::string CLOSED_MESSAGE; - ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {} + ClosedException(const std::string& msg=std::string()); }; } // namespace qpid diff --git a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h index 6cd9c72367..445f07459c 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h +++ b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h @@ -29,6 +29,7 @@ #include <boost/array.hpp> #include <stdint.h> #include <string> +#include <vector> /**@file Mapping from built-in AMQP types to C++ types */ @@ -66,7 +67,7 @@ typedef double Double; typedef float Float; typedef framing::SequenceNumber SequenceNo; using framing::Uuid; -typedef sys::AbsTime DateTime; +typedef sys::AbsTime Datetime; typedef Decimal<Uint8, Int32> Dec32; typedef Decimal<Uint8, Int64> Dec64; @@ -89,14 +90,18 @@ typedef CodableString<Uint16, Uint16> Str16Utf16; typedef CodableString<Uint8, Uint32> Vbin32; -/** FIXME aconway 2008-02-20: todo -byte-ranges 2 byte ranges within a 64-bit payload -sequence-set 2 ranged set representation -map 0xa8 4 a mapping of keys to typed values -list 0xa9 4 a series of consecutive type-value pairs -array 0xaa 4 a defined length collection of values of a single type -struct32 0xab 4 a coded struct with a 32-bit size -*/ +// FIXME aconway 2008-02-26: array encoding +template <class T> struct Array : public std::vector<T> {}; + +// FIXME aconway 2008-02-26: Unimplemented types: +struct ByteRanges {}; +struct SequenceSet {}; +struct Map {}; +struct List {}; +struct Struct32 {}; + +// Top level enum definitions. +enum SegmentType { CONTROL, COMMAND, HEADER, BODY }; }} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp new file mode 100644 index 0000000000..4333a2cd92 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "helpers.h" + +namespace qpid { +namespace amqp_0_10 { + +Control::~Control() {} +Command::~Command() {} +Struct::~Struct() {} + +}} // namespace qpid::amqp_0_10 diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.h b/qpid/cpp/src/qpid/amqp_0_10/helpers.h new file mode 100644 index 0000000000..1769d374d9 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.h @@ -0,0 +1,67 @@ +#ifndef QPID_AMQP_0_10_HELPERS_H +#define QPID_AMQP_0_10_HELPERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the +n * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { + +namespace amqp_0_10 { + +// Look up names by code +const char* getClassName(uint8_t code); +const char* getCommandName(uint8_t classCode, uint8_t code); +const char* getControlName(uint8_t classCode, uint8_t code); +const char* getStructName(uint8_t classCode, uint8_t code); + +struct Command { + virtual ~Command(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Control { + virtual ~Control(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +struct Struct { + virtual ~Struct(); + class Visitor; + virtual void accept(Visitor&) const = 0; +}; + +/** Base class for generated enum domains. + * Enums map to classes for type safety and to provide separate namespaces + * for clashing values. + */ +struct Enum { + int value; + Enum(int v=0) : value(v) {} + operator int() const { return value; } + template <class S> void serialize(S &s) { s(value); } +}; + +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_HELPERS_H*/ diff --git a/qpid/cpp/src/qpid/amqp_0_10/visitors.h b/qpid/cpp/src/qpid/amqp_0_10/visitors.h new file mode 100644 index 0000000000..3835f37f3e --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/visitors.h @@ -0,0 +1,15 @@ +// Visitors +template <class Base> struct Visitor; +template <class Base, class F, class R> FunctorVisitor; + +/** Template base implementation for visitables. */ +template <class Base, class Derived> +struct VisitableBase : public Base { + virtual void accept(Visitor<Derived>& v) { + v.visit(static_cast<Derived>&(*this)); + } + virtual void accept(Visitor<Derived>& v) const { + v.visit(static_cast<const Derived>&(*this)); + } +}; + diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 0a0eb0a0df..9bfa868d9c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) : store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), - sessionManager(conf.ack) + sessionManager(conf.ack), + previewSessionManager(conf.ack) { // Early-Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 55bc7644a5..153eabc6b3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -30,6 +30,7 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" +#include "PreviewSessionManager.h" #include "Vhost.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } SessionManager& getSessionManager() { return sessionManager; } + PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; + PreviewSessionManager previewSessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index c8c1e12f28..ef2c51bb8d 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return session.getVersion();} + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} AccessHandler* getAccessHandler() { diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h index 410d400c9d..4c51e2a826 100644 --- a/qpid/cpp/src/qpid/broker/HandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h @@ -20,7 +20,7 @@ */ #include "SemanticState.h" -#include "SessionState.h" +#include "SessionContext.h" #include "ConnectionState.h" namespace qpid { @@ -35,13 +35,13 @@ class Broker; class HandlerImpl { protected: SemanticState& state; - SessionState& session; + SessionContext& session; HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } ConnectionState& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp index 19e6a235c4..36092bb7f6 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "PreviewSessionHandler.h" -#include "SessionState.h" +#include "PreviewSessionState.h" #include "PreviewConnection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -36,7 +36,7 @@ using namespace std; using namespace qpid::sys; PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const { void PreviewSessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); peerSession.attached(session->getId(), session->getTimeout()); } void PreviewSessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); + session = connection.broker.getPreviewSessionManager().resume(id); session->attach(*this); SequenceNumber seq = session->resuming(); peerSession.attached(session->getId(), session->getTimeout()); @@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) void PreviewSessionHandler::localSuspend() { if (session.get() && session->isAttached()) { session->detach(); - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } } @@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, const SequenceNumberSet& /*seenFrameSet*/) { assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { + if (session->getState() == PreviewSessionState::RESUMING) { session->receivedAck(cumulativeSeenMark); framing::SessionState::Replay replay=session->replay(); std::for_each(replay.begin(), replay.end(), @@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() { void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); + std::auto_ptr<PreviewSessionState> state( + connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); } void PreviewSessionHandler::detached() { - connection.broker.getSessionManager().suspend(session); + connection.broker.getPreviewSessionManager().suspend(session); session.reset(); } diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h index e1096ebf9f..4c517367d7 100644 --- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h +++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h @@ -36,7 +36,7 @@ namespace qpid { namespace broker { class PreviewConnection; -class SessionState; +class PreviewSessionState; /** * A SessionHandler is associated with each active channel. It @@ -45,7 +45,7 @@ class SessionState; */ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand ~PreviewSessionHandler(); /** Returns 0 if not attached to a session */ - SessionState* getSession() { return session.get(); } - const SessionState* getSession() const { return session.get(); } + PreviewSessionState* getSession() { return session.get(); } + const PreviewSessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel.get(); } @@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand framing::AMQP_ClientProxy proxy; framing::AMQP_ClientProxy::Session peerSession; bool ignoring; - std::auto_ptr<SessionState> session; + std::auto_ptr<PreviewSessionState> session; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp new file mode 100644 index 0000000000..ec73082817 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PreviewSessionManager.h" +#include "PreviewSessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/log/Helpers.h" +#include "qpid/memory.h" + +#include <boost/bind.hpp> +#include <boost/range.hpp> + +#include <algorithm> +#include <functional> +#include <ostream> + +namespace qpid { +namespace broker { + +using namespace sys; +using namespace framing; + +PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {} + +PreviewSessionManager::~PreviewSessionManager() {} + +// FIXME aconway 2008-02-01: pass handler*, allow open unattached. +std::auto_ptr<PreviewSessionState> PreviewSessionManager::open( + PreviewSessionHandler& h, uint32_t timeout_) +{ + Mutex::ScopedLock l(lock); + std::auto_ptr<PreviewSessionState> session( + new PreviewSessionState(this, &h, timeout_, ack)); + active.insert(session->getId()); + for_each(observers.begin(), observers.end(), + boost::bind(&Observer::opened, _1,boost::ref(*session))); + return session; +} + +void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) { + Mutex::ScopedLock l(lock); + active.erase(session->getId()); + session->suspend(); + session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + if (session->mgmtObject.get() != 0) + session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); + suspended.push_back(session.release()); // In expiry order + eraseExpired(); +} + +std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id) +{ + Mutex::ScopedLock l(lock); + eraseExpired(); + if (active.find(id) != active.end()) + throw SessionBusyException( + QPID_MSG("Session already active: " << id)); + Suspended::iterator i = std::find_if( + suspended.begin(), suspended.end(), + boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1)) + ); + if (i == suspended.end()) + throw InvalidArgumentException( + QPID_MSG("No suspended session with id=" << id)); + active.insert(id); + std::auto_ptr<PreviewSessionState> state(suspended.release(i).release()); + return state; +} + +void PreviewSessionManager::erase(const framing::Uuid& id) +{ + Mutex::ScopedLock l(lock); + active.erase(id); +} + +void PreviewSessionManager::eraseExpired() { + // Called with lock held. + if (!suspended.empty()) { + Suspended::iterator keep = std::lower_bound( + suspended.begin(), suspended.end(), now(), + boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2)); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } + } +} + +void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) { + observers.push_back(o); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.h b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h new file mode 100644 index 0000000000..65ca49ec89 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h @@ -0,0 +1,100 @@ +#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H +#define QPID_BROKER_PREVIEWSESSIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/framing/Uuid.h> +#include <qpid/sys/Time.h> +#include <qpid/sys/Mutex.h> +#include <qpid/RefCounted.h> + +#include <boost/noncopyable.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <set> +#include <vector> +#include <memory> + +namespace qpid { +namespace broker { + +class PreviewSessionState; +class PreviewSessionHandler; + +/** + * Create and manage PreviewSessionState objects. + */ +class PreviewSessionManager : private boost::noncopyable { + public: + /** + * Observer notified of PreviewSessionManager events. + */ + struct Observer : public RefCounted { + virtual void opened(PreviewSessionState&) {} + }; + + PreviewSessionManager(uint32_t ack); + + ~PreviewSessionManager(); + + /** Open a new active session, caller takes ownership */ + std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_); + + /** Suspend a session, start it's timeout counter. + * The factory takes ownership. + */ + void suspend(std::auto_ptr<PreviewSessionState> session); + + /** Resume a suspended session. + *@throw Exception if timed out or non-existant. + */ + std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); + + /** Add an Observer. */ + void add(const intrusive_ptr<Observer>&); + + private: + typedef boost::ptr_vector<PreviewSessionState> Suspended; + typedef std::set<framing::Uuid> Active; + typedef std::vector<intrusive_ptr<Observer> > Observers; + + void erase(const framing::Uuid&); + void eraseExpired(); + + sys::Mutex lock; + Suspended suspended; + Active active; + uint32_t ack; + Observers observers; + + friend class PreviewSessionState; // removes deleted sessions from active set. +}; + + + +}} // namespace qpid::broker + + + + + +#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/ diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp new file mode 100644 index 0000000000..7188ffbf40 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewSessionState.h" +#include "PreviewSessionManager.h" +#include "PreviewSessionHandler.h" +#include "ConnectionState.h" +#include "Broker.h" +#include "SemanticHandler.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace broker { + +using namespace framing; +using sys::Mutex; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; + +PreviewSessionState::PreviewSessionState( + PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack, timeout_ > 0), + factory(f), handler(h), id(true), timeout(timeout_), + broker(h->getConnection().broker), + version(h->getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + in.next = semanticHandler.get(); + out.next = &handler->out; + + getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + + Manageable* parent = broker.GetVhostObject (); + + if (parent != 0) + { + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + + if (agent.get () != 0) + { + mgmtObject = management::Session::shared_ptr + (new management::Session (this, parent, id.str ())); + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h->getChannel()); + mgmtObject->set_detachedLifespan (getTimeout()); + agent->addObject (mgmtObject); + } + } +} + +PreviewSessionState::~PreviewSessionState() { + // Remove ID from active session list. + if (factory) + factory->erase(getId()); + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +PreviewSessionHandler* PreviewSessionState::getHandler() { + return handler; +} + +AMQP_ClientProxy& PreviewSessionState::getProxy() { + assert(isAttached()); + return getHandler()->getProxy(); +} + +ConnectionState& PreviewSessionState::getConnection() { + assert(isAttached()); + return getHandler()->getConnection(); +} + +void PreviewSessionState::detach() { + getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + Mutex::ScopedLock l(lock); + handler = 0; out.next = 0; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (0); + } +} + +void PreviewSessionState::attach(PreviewSessionHandler& h) { + { + Mutex::ScopedLock l(lock); + handler = &h; + out.next = &handler->out; + if (mgmtObject.get() != 0) + { + mgmtObject->set_attached (1); + mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_channelId (h.getChannel()); + } + } + h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); +} + +void PreviewSessionState::activateOutput() +{ + Mutex::ScopedLock l(lock); + if (isAttached()) { + getConnection().outputTasks.activateOutput(); + } +} + //This class could be used as the callback for queue notifications + //if not attached, it can simply ignore the callback, else pass it + //on to the connection + +ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId, + Args& /*args*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case management::Session::METHOD_DETACH : + if (handler != 0) + { + handler->detach(); + } + status = Manageable::STATUS_OK; + break; + + case management::Session::METHOD_CLOSE : + /* + if (handler != 0) + { + handler->getConnection().closeChannel(handler->getChannel()); + } + status = Manageable::STATUS_OK; + break; + */ + + case management::Session::METHOD_SOLICITACK : + case management::Session::METHOD_RESETLIFESPAN : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} + + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.h b/qpid/cpp/src/qpid/broker/PreviewSessionState.h new file mode 100644 index 0000000000..6e8523317c --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.h @@ -0,0 +1,124 @@ +#ifndef QPID_BROKER_PREVIEWSESSION_H +#define QPID_BROKER_PREVIEWSESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Time.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Session.h" +#include "SessionContext.h" + +#include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> + +#include <set> +#include <vector> +#include <ostream> + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class SemanticHandler; +class PreviewSessionHandler; +class PreviewSessionManager; +class Broker; +class ConnectionState; + +/** + * Broker-side session state includes sessions handler chains, which may + * themselves have state. + */ +class PreviewSessionState : public framing::SessionState, + public SessionContext, + public framing::FrameHandler::Chains, + public management::Manageable +{ + public: + ~PreviewSessionState(); + bool isAttached() { return handler; } + + void detach(); + void attach(PreviewSessionHandler& handler); + + + PreviewSessionHandler* getHandler(); + + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + ConnectionState& getConnection(); + + uint32_t getTimeout() const { return timeout; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + + /** OutputControl **/ + void activateOutput(); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); + + // Normally SessionManager creates sessions. + PreviewSessionState(PreviewSessionManager*, + PreviewSessionHandler* out, + uint32_t timeout, + uint32_t ackInterval); + + + private: + PreviewSessionManager* factory; + PreviewSessionHandler* handler; + framing::Uuid id; + uint32_t timeout; + sys::AbsTime expiry; // Used by SessionManager. + Broker& broker; + framing::ProtocolVersion version; + sys::Mutex lock; + boost::scoped_ptr<SemanticHandler> semanticHandler; + management::Session::shared_ptr mgmtObject; + + friend class PreviewSessionManager; +}; + + +inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) { + return out << session.getId(); +} + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index 2a79496144..fdde7ec18c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -22,7 +22,6 @@ #include "SemanticHandler.h" #include "SemanticState.h" #include "SessionContext.h" -#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -37,9 +36,9 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(SessionState& s) : +SemanticHandler::SemanticHandler(SessionContext& s) : state(*this,s), session(s), - msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()), + msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()), ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) {} @@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { - SessionContext* handler = session.getHandler(); - if (handler) { - uint32_t maxFrameSize = handler->getConnection().getFrameMax(); - MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize); - } else { - QPID_LOG(error, "Dropping message as session is no longer attached to a channel."); - } + uint32_t maxFrameSize = session.getConnection().getFrameMax(); + MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); return outgoing.hwm; } diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index d7f3ec8799..893a0cbded 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -46,7 +46,7 @@ class AMQHeaderBody; namespace broker { -class SessionState; +class SessionContext; class SemanticHandler : public DeliveryAdapter, public framing::FrameHandler, @@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter, typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; SemanticState state; - SessionState& session; + SessionContext& session; // TODO aconway 2007-09-20: Why are these on the handler rather than the // state? IncomingExecutionContext incoming; @@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter, framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } //Connection& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getBroker(); } + Broker& getBroker() { return session.getConnection().getBroker(); } public: - SemanticHandler(SessionState& session); + SemanticHandler(SessionContext& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 7b4035604f..9b44f31e14 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -19,7 +19,7 @@ * */ -#include "SessionState.h" +#include "SessionContext.h" #include "BrokerAdapter.h" #include "Queue.h" #include "Connection.h" @@ -56,7 +56,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::ptr_map; -SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) +SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) : session(ss), deliveryAdapter(da), prefetchSize(0), @@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (parent->getSession().isAttached() && accept(msg.payload)) { - allocateCredit(msg.payload); - DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg, token); - if (windowing || ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg.payload); - } - return true; - } else { - QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent); - return false; + allocateCredit(msg.payload); + DeliveryId deliveryTag = + parent->deliveryAdapter.deliver(msg, token); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + } + if (acquire && !ackExpected) { + queue->dequeue(0, msg.payload); } + return true; } bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg) @@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c) if(queue) { queue->cancel(c); if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(getSession().getBroker(), queue); + Queue::tryAutoDelete(session.getBroker(), queue); } } } @@ -352,7 +347,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); + cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 7fc6e4167c..cc9c0e1e9b 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -45,7 +45,7 @@ namespace qpid { namespace broker { -class SessionState; +class SessionContext; /** * SemanticState holds the L3 and L4 state of an open session, whether @@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains, typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; - SessionState& session; + SessionContext& session; DeliveryAdapter& deliveryAdapter; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains, void cancel(ConsumerImpl&); public: - SemanticState(DeliveryAdapter&, SessionState&); + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); - SessionState& getSession() { return session; } + SessionContext& getSession() { return session; } /** * Get named queue, never returns 0. diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index a27b43cf65..a289310b15 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -25,6 +25,7 @@ #include "qpid/framing/FrameHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/OutputControl.h" #include "ConnectionState.h" @@ -33,17 +34,13 @@ namespace qpid { namespace broker { -class SessionContext : public framing::FrameHandler::InOutHandler +class SessionContext : public sys::OutputControl { public: - SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {} virtual ~SessionContext(){} virtual ConnectionState& getConnection() = 0; - virtual const ConnectionState& getConnection() const = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; - virtual const framing::AMQP_ClientProxy& getProxy() const = 0; - virtual void detach() = 0; - virtual framing::ChannelId getChannel() const = 0; + virtual Broker& getBroker() = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 1cb10d0c19..0e3c9928d1 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,6 +21,7 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" +#include "ConnectionState.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" #include "qpid/framing/ClientInvoker.h" @@ -36,7 +37,7 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : SessionContext(c.getOutput()), + : InOutHandler(0, &out), connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. @@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) { // AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); + if (!ignoring) { + if (m && + (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || + invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + return; + } else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { + return; + } else { + throw ChannelErrorException( + QPID_MSG("Channel " << channel.get() << " is not open")); + } } } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. @@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) { } void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) + if (!session.get()) { + std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); + } } void SessionHandler::assertClosed(const char* method) const { @@ -208,4 +215,32 @@ void SessionHandler::detached() ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } +void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + assertAttached("complete"); + session->complete(cumulative, range); +} + +void SessionHandler::flush() +{ + assertAttached("flush"); + session->flush(); +} +void SessionHandler::sync() +{ + assertAttached("sync"); + session->sync(); +} + +void SessionHandler::noop() +{ + assertAttached("noop"); + session->noop(); +} + +void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +{ + //never actually sent by client at present +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 5a72bfb12d..e6bc463a82 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -28,7 +28,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelHandler.h" -#include "SessionContext.h" +#include "qpid/framing/SequenceNumber.h" #include <boost/noncopyable.hpp> @@ -36,16 +36,18 @@ namespace qpid { namespace broker { class Connection; +class ConnectionState; class SessionState; /** * A SessionHandler is associated with each active channel. It - * receives incoming frames, handles session commands and manages the + * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::AMQP_ClientOperations::SessionHandler, - public SessionContext, + public framing::AMQP_ServerOperations::ExecutionHandler, + public framing::FrameHandler::InOutHandler, private boost::noncopyable { public: @@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); void detached(); + //Execution methods: + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void result(uint32_t command, const std::string& data); + void sync(); void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; - Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index aa7ac9a8bb..571d3365db 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -45,7 +45,7 @@ SessionManager::~SessionManager() {} // FIXME aconway 2008-02-01: pass handler*, allow open unattached. std::auto_ptr<SessionState> SessionManager::open( - SessionContext& h, uint32_t timeout_) + SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); std::auto_ptr<SessionState> session( diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h index 94956a83ed..7e8bd18f57 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.h +++ b/qpid/cpp/src/qpid/broker/SessionManager.h @@ -38,7 +38,7 @@ namespace qpid { namespace broker { class SessionState; -class SessionContext; +class SessionHandler; /** * Create and manage SessionState objects. @@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable { ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_); + std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_); /** Suspend a session, start it's timeout counter. * The factory takes ownership. diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index b6c59cfb3b..573a567da6 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -19,12 +19,16 @@ * */ #include "SessionState.h" -#include "SessionManager.h" -#include "SessionContext.h" -#include "ConnectionState.h" #include "Broker.h" +#include "ConnectionState.h" +#include "MessageDelivery.h" #include "SemanticHandler.h" +#include "SessionManager.h" +#include "SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/ServerInvoker.h" + +#include <boost/bind.hpp> namespace qpid { namespace broker { @@ -37,17 +41,17 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack) + SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) : framing::SessionState(ack, timeout_ > 0), factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) + semanticState(*this, *this), + adapter(semanticState), + msgBuilder(&broker.getStore(), broker.getStagingThreshold()), + ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) { - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.addOutputTask(&semanticState); Manageable* parent = broker.GetVhostObject (); @@ -76,7 +80,7 @@ SessionState::~SessionState() { mgmtObject->resourceDestroy (); } -SessionContext* SessionState::getHandler() { +SessionHandler* SessionState::getHandler() { return handler; } @@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() { } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); + getConnection().outputTasks.removeOutputTask(&semanticState); Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; + handler = 0; if (mgmtObject.get() != 0) { mgmtObject->set_attached (0); } } -void SessionState::attach(SessionContext& h) { +void SessionState::attach(SessionHandler& h) { { Mutex::ScopedLock l(lock); handler = &h; - out.next = &handler->out; if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); @@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); + h.getConnection().outputTasks.addOutputTask(&semanticState); } void SessionState::activateOutput() @@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } +void SessionState::handleCommand(framing::AMQMethodBody* method) +{ + SequenceNumber id = incoming.next(); + Invoker::Result invocation = invoke(adapter, *method); + incoming.complete(id); + + if (!invocation.wasHandled()) { + throw NotImplementedException("Not implemented"); + } else if (invocation.hasResult()) { + getProxy().getExecution().result(id.getValue(), invocation.getResult()); + } + if (method->isSync()) { + incoming.sync(id); + sendCompletion(); + } + //TODO: if window gets too large send unsolicited completion +} + +void SessionState::handleContent(AMQFrame& frame) +{ + intrusive_ptr<Message> msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(incoming.next()); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&getConnection()); + semanticState.handle(msg); + msgBuilder.end(); + incoming.track(msg); + if (msg->getFrames().getMethod()->isSync()) { + incoming.sync(msg->getCommandId()); + sendCompletion(); + } + } +} + +void SessionState::handle(AMQFrame& frame) +{ + //TODO: make command handling more uniform, regardless of whether + //commands carry content. (For now, assume all single frame + //assmblies are non-content bearing and all content-bearing + //assmeblies will have more than one frame): + if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod()); + } else { + handleContent(frame); + } + +} + +DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) +{ + uint32_t maxFrameSize = getConnection().getFrameMax(); + MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); + return outgoing.hwm; +} + +void SessionState::sendCompletion() +{ + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); + getProxy().getExecution().complete(mark.getValue(), range); +} + +void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) +{ + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + semanticState.ackCumulative(mark.getValue()); + } + range.processRanges(ackOp); +} + +void SessionState::flush() +{ + incoming.flush(); + sendCompletion(); +} + +void SessionState::sync() +{ + incoming.sync(); + sendCompletion(); +} + +void SessionState::noop() +{ + incoming.noop(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 8a12e580b7..98c21a8ab5 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -27,10 +27,15 @@ #include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/OutputControl.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" +#include "BrokerAdapter.h" +#include "DeliveryAdapter.h" +#include "MessageBuilder.h" +#include "SessionContext.h" +#include "SemanticState.h" +#include "IncomingExecutionContext.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -47,8 +52,7 @@ class AMQP_ClientProxy; namespace broker { -class SemanticHandler; -class SessionContext; +class SessionHandler; class SessionManager; class Broker; class ConnectionState; @@ -58,8 +62,8 @@ class ConnectionState; * themselves have state. */ class SessionState : public framing::SessionState, - public framing::FrameHandler::Chains, - public sys::OutputControl, + public SessionContext, + public DeliveryAdapter, public management::Manageable { public: @@ -67,10 +71,10 @@ class SessionState : public framing::SessionState, bool isAttached() { return handler; } void detach(); - void attach(SessionContext& handler); + void attach(SessionHandler& handler); - SessionContext* getHandler(); + SessionHandler* getHandler(); /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,6 +89,19 @@ class SessionState : public framing::SessionState, /** OutputControl **/ void activateOutput(); + void handle(framing::AMQFrame& frame); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); + + void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); + void flush(); + void noop(); + void sync(); + void sendCompletion(); + + //delivery adapter methods: + DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); + // Manageable entry points management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t @@ -92,21 +109,32 @@ class SessionState : public framing::SessionState, // Normally SessionManager creates sessions. SessionState(SessionManager*, - SessionContext* out, + SessionHandler* out, uint32_t timeout, uint32_t ackInterval); private: + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + SessionManager* factory; - SessionContext* handler; + SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; framing::ProtocolVersion version; sys::Mutex lock; - boost::scoped_ptr<SemanticHandler> semanticHandler; + + SemanticState semanticState; + BrokerAdapter adapter; + MessageBuilder msgBuilder; + + //execution state + IncomingExecutionContext incoming; + framing::Window outgoing; + RangedOperation ackOp; + management::Session::shared_ptr mgmtObject; friend class SessionManager; diff --git a/qpid/cpp/src/qpid/client/Session.h b/qpid/cpp/src/qpid/client/Session.h index 3293af60fe..5d91f289e2 100644 --- a/qpid/cpp/src/qpid/client/Session.h +++ b/qpid/cpp/src/qpid/client/Session.h @@ -27,7 +27,7 @@ namespace qpid { namespace client { /** - * Session is currently just an alias for Session_0_10 + * Session is currently just an alias for Session_99_0 * * \ingroup clientapi */ diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index bca6c49c13..5152aa2e43 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,7 @@ */ #include "Cluster.h" -#include "qpid/broker/SessionState.h" +#include "qpid/broker/PreviewSessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,18 +32,18 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::PreviewSessionState; namespace { // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + PreviewSessionState& session; Cluster& cluster; bool busy; Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} void handle(AMQFrame& f) { Mutex::ScopedLock l(lock); @@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) { c.next = h; } -struct SessionObserver : public broker::SessionManager::Observer { +struct SessionObserver : public broker::PreviewSessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void opened(PreviewSessionState& s) { // FIXME aconway 2008-01-29: IList for memory management. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index b62b2be5f1..733db8003d 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } + intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - intrusive_ptr<broker::SessionManager::Observer> observer; + intrusive_ptr<broker::PreviewSessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp index ceafa389b0..0ea3953175 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin { cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getSessionManager().add(cluster->getObserver()); + broker->getPreviewSessionManager().add(cluster->getObserver()); } } }; diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h index 1be8856c13..b15e14d6f6 100644 --- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -33,6 +33,7 @@ namespace qpid { namespace framing { static ProtocolVersion highestProtocolVersion(99, 0); +//static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ } /* namespace qpid */ diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h index b6ac897e96..86b99a83b0 100644 --- a/qpid/cpp/src/qpid/framing/Proxy.h +++ b/qpid/cpp/src/qpid/framing/Proxy.h @@ -39,6 +39,7 @@ class Proxy void send(const AMQBody&); ProtocolVersion getVersion() const; + FrameHandler& getHandler() { return out; } protected: FrameHandler& out; diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp index 700aeef47c..715cdaec2a 100644 --- a/qpid/cpp/src/tests/exception_test.cpp +++ b/qpid/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) { BOOST_CHECK_THROW(session.close(), InternalErrorException); } -BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) { +BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) { BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException); } diff --git a/qpid/cpp/src/tests/serialize.cpp b/qpid/cpp/src/tests/serialize.cpp index 72a92ee226..a120be6458 100644 --- a/qpid/cpp/src/tests/serialize.cpp +++ b/qpid/cpp/src/tests/serialize.cpp @@ -79,7 +79,7 @@ template <class A, class B, class C, class D> struct concat4 { typedef typename typedef mpl::vector<Bit, Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes; typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes; typedef mpl::vector<Double, Float>::type FloatTypes; -typedef mpl::vector<SequenceNo, Uuid, DateTime, Dec32, Dec64> FixedSizeClassTypes; +typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes; typedef mpl::vector<Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes; @@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(testNetworkByteOrder) { void testValue(bool& b) { b = true; } template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; } void testValue(long long& l) { l = 12345; } -void testValue(DateTime& dt) { dt = qpid::sys::now(); } +void testValue(Datetime& dt) { dt = qpid::sys::now(); } void testValue(Uuid& uuid) { uuid=Uuid(true); } template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=1234; } void testValue(SequenceNo& s) { s = 42; } diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index bbdb501b80..d1e3293a3e 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -20,7 +20,7 @@ - --> -<amqp major="0" minor="10" port="5672"> +<amqp major="99" minor="0" port="5672"> <class name = "cluster" index = "201"> |
