summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/configure.ac15
-rw-r--r--qpid/cpp/qpidc.spec.in1
-rwxr-xr-xqpid/cpp/rubygen/0-10/specification.rb177
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb (renamed from qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb (renamed from qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/MethodHolder.rb (renamed from qpid/cpp/rubygen/templates/MethodHolder.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/Operations.rb (renamed from qpid/cpp/rubygen/templates/Operations.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/OperationsInvoker.rb (renamed from qpid/cpp/rubygen/templates/OperationsInvoker.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/Proxy.rb (renamed from qpid/cpp/rubygen/templates/Proxy.rb)8
-rw-r--r--qpid/cpp/rubygen/99-0/Session.rb (renamed from qpid/cpp/rubygen/templates/Session.rb)4
-rwxr-xr-xqpid/cpp/rubygen/99-0/all_method_bodies.rb (renamed from qpid/cpp/rubygen/templates/all_method_bodies.rb)2
-rwxr-xr-xqpid/cpp/rubygen/99-0/constants.rb (renamed from qpid/cpp/rubygen/templates/constants.rb)2
-rw-r--r--qpid/cpp/rubygen/99-0/frame_body_lists.rb (renamed from qpid/cpp/rubygen/templates/frame_body_lists.rb)2
-rw-r--r--qpid/cpp/rubygen/99-0/structs.rb (renamed from qpid/cpp/rubygen/templates/structs.rb)2
-rwxr-xr-xqpid/cpp/rubygen/MethodBodyDefaultVisitor.rb2
-rwxr-xr-xqpid/cpp/rubygen/amqpgen.rb139
-rwxr-xr-xqpid/cpp/rubygen/cppgen.rb104
-rwxr-xr-xqpid/cpp/rubygen/generate41
-rw-r--r--qpid/cpp/src/Makefile.am14
-rw-r--r--qpid/cpp/src/qpid/Exception.cpp27
-rw-r--r--qpid/cpp/src/qpid/Exception.h25
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/built_in_types.h23
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/helpers.cpp30
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/helpers.h67
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/visitors.h15
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--qpid/cpp/src/qpid/broker/HandlerImpl.h6
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp20
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionHandler.h10
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp112
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionManager.h100
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionState.cpp169
-rw-r--r--qpid/cpp/src/qpid/broker/PreviewSessionState.h124
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticHandler.h8
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp31
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp63
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h15
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionManager.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp128
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h48
-rw-r--r--qpid/cpp/src/qpid/client/Session.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h1
-rw-r--r--qpid/cpp/src/qpid/framing/Proxy.h1
-rw-r--r--qpid/cpp/src/tests/exception_test.cpp2
-rw-r--r--qpid/cpp/src/tests/serialize.cpp4
-rw-r--r--qpid/cpp/xml/cluster.xml2
55 files changed, 1391 insertions, 232 deletions
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index 3d68ed7890..c17051d431 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -122,11 +122,11 @@ test -n "$RUBY" && generate=yes
test -z "$RUBY" && AC_MSG_ERROR([Missing ruby installation (try "yum install ruby").])
specdir=`pwd`/$srcdir/../specs
-AMQP_XML=$specdir/amqp.0-10-preview.xml
-AC_SUBST(AMQP_XML)
-ls $AMQP_XML >/dev/null 2>&1 || generate=no
-
-AM_CONDITIONAL([GENERATE], [test x$generate = xyes])
+AMQP_PREVIEW_XML=$specdir/amqp.0-10-preview.xml
+AMQP_FINAL_XML=$specdir/amqp.0-10.xml
+AC_SUBST(AMQP_PREVIEW_XML)
+AC_SUBST(AMQP_FINAL_XML)
+AM_CONDITIONAL([GENERATE], [ls $AMQP_FINAL_XML >/dev/null])
# URL and download URL for the package.
URL=http://rhm.et.redhat.com/qpidc
@@ -139,7 +139,6 @@ AC_CHECK_HEADERS([boost/shared_ptr.hpp uuid/uuid.h],,
AC_MSG_ERROR([Missing required header files.]))
# Check for optional CPG requirement.
-save_ldflags=$LDFLAGS
LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais"
AC_ARG_WITH([cpg],
@@ -157,15 +156,13 @@ AC_ARG_WITH([cpg],
esac],
[ # not specified - enable if libs/headers available.
with_CPG=yes
- AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no])
AC_CHECK_HEADERS([openais/cpg.h],,[with_CPG=no])
+ AC_CHECK_LIB([cpg],[cpg_initialize],,[with_CPG=no])
]
)
AM_CONDITIONAL([CPG], [test x$with_CPG = xyes])
if test x$with_CPG = xyes; then
CPPFLAGS+=" -DCPG"
-else
- LDFLAGS=$save_ldflags
fi
# Files to generate
diff --git a/qpid/cpp/qpidc.spec.in b/qpid/cpp/qpidc.spec.in
index 167247f67b..fd4a24b2e5 100644
--- a/qpid/cpp/qpidc.spec.in
+++ b/qpid/cpp/qpidc.spec.in
@@ -104,6 +104,7 @@ make check
%files devel
%defattr(-,root,root,-)
%_includedir/qpid/*.h
+%_includedir/qpid/amqp_0_10
%_includedir/qpid/client
%_includedir/qpid/framing
%_includedir/qpid/sys
diff --git a/qpid/cpp/rubygen/0-10/specification.rb b/qpid/cpp/rubygen/0-10/specification.rb
new file mode 100755
index 0000000000..026b49e1a9
--- /dev/null
+++ b/qpid/cpp/rubygen/0-10/specification.rb
@@ -0,0 +1,177 @@
+#!/usr/bin/env ruby
+$: << ".." # Include .. in load path
+require 'cppgen'
+
+class Specification < CppGen
+ def initialize(outdir, amqp)
+ super(outdir, amqp)
+ @ns="qpid::amqp_#{@amqp.version.bars}"
+ @dir="qpid/amqp_#{@amqp.version.bars}"
+ end
+
+ # domains
+
+ def domain_h(d)
+ genl
+ typename=d.name.typename
+ if d.enum
+ scope("enum #{typename} {", "};") {
+ genl d.enum.choices.map { |c|
+ "#{c.name.constname} = #{c.value}" }.join(",\n")
+ }
+ elsif (d.type_ == "array")
+ genl "typedef Array<#{ArrayTypes[d.name].amqp2cpp}> #{typename};"
+ else
+ genl "typedef #{d.type_.amqp2cpp} #{typename};"
+ end
+ end
+
+ # class constants
+
+ def class_h(c)
+ genl "const uint8_t CODE=#{c.code};"
+ genl "extern const char* NAME;"
+ end
+
+ def class_cpp(c)
+ genl "const char* NAME=\"#{c.fqname}\";"
+ end
+
+ # Used by structs, commands and controls.
+ def action_struct_h(x, base, consts, &block)
+ genl
+ struct(x.classname, "public #{base}") {
+ x.fields.each { |f| genl "#{f.type_.amqp2cpp} #{f.cppname};" }
+ genl
+ genl "static const char* NAME;"
+ consts.each { |c| genl "static const uint8_t #{c.upcase}=#{x.send c or 0};"}
+ genl "static const uint8_t CLASS_CODE=#{x.containing_class.nsname}::CODE;"
+ genl
+ genl "#{x.classname}();"
+ scope("#{x.classname}(",");") { genl x.parameters } unless x.fields.empty?
+ genl
+ genl "void accept(Visitor&) const;"
+ genl
+ yield if block
+ }
+ end
+
+ def action_struct_cpp(x)
+ genl
+ genl "const char* #{x.classname}::NAME=\"#{x.fqname}\";"
+ genl
+ genl "#{x.classname}::#{x.classname}() {}";
+ genl
+ if not x.fields.empty?
+ scope("#{x.classname}::#{x.classname}(",") :") { genl x.parameters }
+ indent() { genl x.initializers }
+ genl "{}"
+ genl
+ end
+ scope("void #{x.classname}::accept(Visitor&) const {","}") {
+ genl "// FIXME aconway 2008-02-27: todo"
+ }
+ end
+
+ # structs
+
+ def struct_h(s) action_struct_h(s, "Struct", ["size","pack","code"]); end
+ def struct_cpp(s) action_struct_cpp(s) end
+
+ # command and control
+
+ def action_h(a)
+ action_struct_h(a, a.base, ["code"]) {
+ scope("template <class T> void invoke(T& target) {","}") {
+ scope("target.#{a.funcname}(", ");") { genl a.values }
+ }
+ genl
+ scope("template <class S> void serialize(S& s) {","}") {
+ gen "s"
+ a.fields.each { |f| gen "(#{f.cppname})"}
+ genl ";"
+ } unless a.fields.empty?
+ }
+ end
+
+ def action_cpp(a) action_struct_cpp(a); end
+
+ # Types that must be generated early because they are used by other types.
+ def pregenerate?(x) not @amqp.used_by[x.fqname].empty?; end
+
+ # Generate the log
+ def gen_specification()
+ h_file("#{@dir}/specification") {
+ include "#{@dir}/built_in_types"
+ include "#{@dir}/helpers"
+ include "<boost/call_traits.hpp>"
+ genl "using boost::call_traits;"
+ namespace(@ns) {
+ # Top level
+ @amqp.domains.each { |d|
+ # segment-type and track are are built in
+ domain_h d unless ["track","segment-type"].include?(d.name)
+ }
+ puts @amqp.used_by.inspect
+
+ # Domains and structs that must be generated early because
+ # they are used by other definitions:
+ each_class_ns { |c|
+ class_h c
+ c.domains.each { |d| domain_h d if pregenerate? d }
+ c.structs.each { |s| struct_h s if pregenerate? s }
+ }
+ # Now dependent domains/structs and actions
+ each_class_ns { |c|
+ c.domains.each { |d| domain_h d if not pregenerate? d }
+ c.structs.each { |s| struct_h s if not pregenerate? s }
+ c.actions.each { |a| action_h a }
+ }
+ }
+ }
+
+ cpp_file("#{@dir}/specification") {
+ include "#{@dir}/specification"
+ namespace(@ns) {
+ each_class_ns { |c|
+ class_cpp c
+ c.actions.each { |a| action_cpp a}
+ c.structs.each { |s| struct_cpp s }
+ }
+ }
+ }
+ end
+
+ def gen_proxy()
+ h_file("#{@dir}/Proxy.h") {
+ include "#{@dir}/specification"
+ namespace(@ns) {
+ genl "template <class F, class R=F::result_type>"
+ cpp_class("ProxyTemplate") {
+ public
+ genl "ProxyTemplate(F f) : functor(f) {}"
+ @amqp.classes.each { |c|
+ c.actions.each { |a|
+ scope("R #{a.funcname}(", ")") { genl a.parameters }
+ scope() {
+ var=a.name.funcname
+ scope("#{a.classname} #{var}(",");") { genl a.arguments }
+ genl "return functor(#{var});"
+ }
+ }
+ }
+ private
+ genl "F functor;"
+ }
+ }
+ }
+ end
+
+ def generate
+ gen_specification
+ gen_proxy
+ end
+end
+
+Specification.new($outdir, $amqp).generate();
+
diff --git a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb
index 18f74a2e41..f9ef95f5a0 100755
--- a/qpid/cpp/rubygen/templates/MethodBodyConstVisitor.rb
+++ b/qpid/cpp/rubygen/99-0/MethodBodyConstVisitor.rb
@@ -23,5 +23,5 @@ class MethodBodyConstVisitorGen < CppGen
end
end
-MethodBodyConstVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyConstVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb
index 4944b10a84..a74b0c06d6 100755
--- a/qpid/cpp/rubygen/templates/MethodBodyDefaultVisitor.rb
+++ b/qpid/cpp/rubygen/99-0/MethodBodyDefaultVisitor.rb
@@ -31,5 +31,5 @@ class MethodBodyDefaultVisitorGen < CppGen
end
end
-MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/MethodHolder.rb b/qpid/cpp/rubygen/99-0/MethodHolder.rb
index 7e915677b2..a708db6676 100755
--- a/qpid/cpp/rubygen/templates/MethodHolder.rb
+++ b/qpid/cpp/rubygen/99-0/MethodHolder.rb
@@ -96,5 +96,5 @@ EOS
end
end
-MethodHolderGen.new(Outdir, Amqp).generate();
+MethodHolderGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/Operations.rb b/qpid/cpp/rubygen/99-0/Operations.rb
index 91007ef3e1..c985bb6105 100755
--- a/qpid/cpp/rubygen/templates/Operations.rb
+++ b/qpid/cpp/rubygen/99-0/Operations.rb
@@ -91,6 +91,6 @@ EOS
end
end
-OperationsGen.new("client",ARGV[0], Amqp).generate()
-OperationsGen.new("server",ARGV[0], Amqp).generate()
+OperationsGen.new("client",ARGV[0], $amqp).generate()
+OperationsGen.new("server",ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/OperationsInvoker.rb b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb
index 747dd06189..642f98ce8e 100755
--- a/qpid/cpp/rubygen/templates/OperationsInvoker.rb
+++ b/qpid/cpp/rubygen/99-0/OperationsInvoker.rb
@@ -88,5 +88,5 @@ class OperationsInvokerGen < CppGen
end
end
-OperationsInvokerGen.new("client",ARGV[0], Amqp).generate()
-OperationsInvokerGen.new("server",ARGV[0], Amqp).generate()
+OperationsInvokerGen.new("client",ARGV[0], $amqp).generate()
+OperationsInvokerGen.new("server",ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/Proxy.rb b/qpid/cpp/rubygen/99-0/Proxy.rb
index 467476506c..2829884673 100755
--- a/qpid/cpp/rubygen/templates/Proxy.rb
+++ b/qpid/cpp/rubygen/99-0/Proxy.rb
@@ -62,7 +62,9 @@ EOS
include "<sstream>"
include "#{@classname}.h"
include "qpid/framing/amqp_types_full.h"
- Amqp.methods_on(@chassis).each { |m| include "qpid/framing/"+m.body_name }
+ @amqp.methods_on(@chassis).each {
+ |m| include "qpid/framing/"+m.body_name
+ }
genl
namespace("qpid::framing") {
genl "#{@classname}::#{@classname}(FrameHandler& f) :"
@@ -75,6 +77,6 @@ EOS
end
-ProxyGen.new("client", Outdir, Amqp).generate;
-ProxyGen.new("server", Outdir, Amqp).generate;
+ProxyGen.new("client", $outdir, $amqp).generate;
+ProxyGen.new("server", $outdir, $amqp).generate;
diff --git a/qpid/cpp/rubygen/templates/Session.rb b/qpid/cpp/rubygen/99-0/Session.rb
index 6a50fdb462..e01a28a62d 100644
--- a/qpid/cpp/rubygen/templates/Session.rb
+++ b/qpid/cpp/rubygen/99-0/Session.rb
@@ -190,6 +190,6 @@ EOS
end
end
-SessionNoKeywordGen.new(ARGV[0], Amqp).generate()
-SessionGen.new(ARGV[0], Amqp).generate()
+SessionNoKeywordGen.new(ARGV[0], $amqp).generate()
+SessionGen.new(ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/templates/all_method_bodies.rb b/qpid/cpp/rubygen/99-0/all_method_bodies.rb
index d06f459493..5971d49189 100755
--- a/qpid/cpp/rubygen/templates/all_method_bodies.rb
+++ b/qpid/cpp/rubygen/99-0/all_method_bodies.rb
@@ -17,5 +17,5 @@ class AllMethodBodiesGen < CppGen
end
end
-AllMethodBodiesGen.new(Outdir, Amqp).generate();
+AllMethodBodiesGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/constants.rb b/qpid/cpp/rubygen/99-0/constants.rb
index 5fbbefe218..b5f559d504 100755
--- a/qpid/cpp/rubygen/templates/constants.rb
+++ b/qpid/cpp/rubygen/99-0/constants.rb
@@ -78,5 +78,5 @@ class ConstantsGen < CppGen
end
end
-ConstantsGen.new(Outdir, Amqp).generate();
+ConstantsGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/templates/frame_body_lists.rb b/qpid/cpp/rubygen/99-0/frame_body_lists.rb
index 634001ab14..b20e4550f3 100644
--- a/qpid/cpp/rubygen/templates/frame_body_lists.rb
+++ b/qpid/cpp/rubygen/99-0/frame_body_lists.rb
@@ -26,6 +26,6 @@ EOS
end
end
-FrameBodyListsGen.new(ARGV[0], Amqp).generate;
+FrameBodyListsGen.new(ARGV[0], $amqp).generate;
diff --git a/qpid/cpp/rubygen/templates/structs.rb b/qpid/cpp/rubygen/99-0/structs.rb
index edbadb01f6..336591be00 100644
--- a/qpid/cpp/rubygen/templates/structs.rb
+++ b/qpid/cpp/rubygen/99-0/structs.rb
@@ -534,5 +534,5 @@ EOS
end
end
-StructGen.new(ARGV[0], Amqp).generate()
+StructGen.new(ARGV[0], $amqp).generate()
diff --git a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
index d1212153ca..1fff1d51db 100755
--- a/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
+++ b/qpid/cpp/rubygen/MethodBodyDefaultVisitor.rb
@@ -30,5 +30,5 @@ class MethodBodyDefaultVisitorGen < CppGen
end
end
-MethodBodyDefaultVisitorGen.new(Outdir, Amqp).generate();
+MethodBodyDefaultVisitorGen.new($outdir, $amqp).generate();
diff --git a/qpid/cpp/rubygen/amqpgen.rb b/qpid/cpp/rubygen/amqpgen.rb
index f998909ec8..b1e635a27b 100755
--- a/qpid/cpp/rubygen/amqpgen.rb
+++ b/qpid/cpp/rubygen/amqpgen.rb
@@ -119,7 +119,16 @@ class AmqpElement
# List of children of type elname, or all children if elname
# not specified.
def children(elname=nil)
- @cache_children[elname] ||= @children.select { |c| elname==c.xml.name }
+ if elname
+ @cache_children[elname] ||= @children.select { |c| elname==c.xml.name }
+ else
+ @children
+ end
+ end
+
+ def each_descendant(&block)
+ yield self
+ @children.each { |c| c.each_descendant(&block) }
end
# Look up child of type elname with attribute name.
@@ -127,6 +136,9 @@ class AmqpElement
@cache_child[[elname,name]] ||= children(elname).find { |c| c.name==name }
end
+ # Fully qualified amqp dotted name of this element
+ def dotted_name() (parent ? parent.dotted_name+"." : "") + name; end
+
# The root <amqp> element.
def root() @root ||=parent ? parent.root : self; end
@@ -140,9 +152,22 @@ class AmqpElement
# Text of doc child if there is one.
def doc() d=xml.elements["doc"]; d and d.text; end
+ def fqname()
+ throw "fqname: #{self} #{parent.fqname} has no name" unless name
+ p=parent && parent.fqname
+ p ? p+"."+name : name;
+ end
+
+ def containing_class()
+ return self if is_a? AmqpClass
+ return parent && parent.containing_class
+ end
+
end
-AmqpResponse = AmqpElement
+class AmqpResponse < AmqpElement
+ def initialize(xml, parent) super; end
+end
class AmqpDoc < AmqpElement
def initialize(xml,parent) super; end
@@ -153,18 +178,32 @@ class AmqpChoice < AmqpElement
def initialize(xml,parent) super; end
amqp_attr_reader :name, :value
end
-
+
class AmqpEnum < AmqpElement
def initialize(xml,parent) super; end
amqp_child_reader :choice
end
+# 0-10 array domains are missing element type information, add it here.
+ArrayTypes={
+ "str16-array" => "str-16",
+ "amqp-host-array" => "connection.amqp-host-url",
+ "command-fragments" => "session.command-fragment",
+ "in-doubt" => "dtx.xid"
+}
+
class AmqpDomain < AmqpElement
- def initialize(xml, parent) super; end
+ def initialize(xml, parent)
+ super
+ root.used_by[uses].push(fqname) if uses and uses.index('.')
+ end
+
amqp_attr_reader :type
amqp_single_child_reader :struct # preview
amqp_single_child_reader :enum
+ def uses() type_=="array" ? ArrayTypes[name] : type_; end
+
def unalias()
d=self
while (d.type_ != d.name and root.domain(d.type_))
@@ -180,10 +219,13 @@ class AmqpException < AmqpElement
end
class AmqpField < AmqpElement
- def initialize(xml, amqp) super; end;
+ def initialize(xml, amqp)
+ super;
+ root.used_by[type_].push(parent.fqname) if type_ and type_.index('.')
+ end
+
def domain() root.domain(xml.attributes["domain"]); end
amqp_single_child_reader :struct # preview
- # FIXME aconway 2008-02-21: exceptions in fields - need to update c++ mapping.
amqp_child_reader :exception
amqp_attr_reader :type, :default, :code, :required
end
@@ -198,15 +240,11 @@ class AmqpConstant < AmqpElement
amqp_attr_reader :value, :class
end
-# FIXME aconway 2008-02-21:
-# class AmqpResponse < AmqpElement
-# def initialize(xml, parent) super; end
-# end
-
class AmqpResult < AmqpElement
def initialize(xml, parent) super; end
amqp_single_child_reader :struct # preview
amqp_attr_reader :type
+ def name() "result"; end
end
class AmqpEntry < AmqpElement
@@ -236,8 +274,8 @@ class AmqpStruct < AmqpElement
amqp_attr_reader :size, :code, :pack
amqp_child_reader :field
- alias :raw_pack :pack
# preview - preview code needs default "short" for pack.
+ alias :raw_pack :pack
def pack() raw_pack or (not parent.final? and "short"); end
def result?() parent.xml.name == "result"; end
def domain?() parent.xml.name == "domain"; end
@@ -265,17 +303,21 @@ class AmqpRole < AmqpElement
amqp_attr_reader :implement
end
-class AmqpControl < AmqpElement
+# Base class for command and control.
+class AmqpAction < AmqpElement
def initialize(xml,amqp) super; end
amqp_child_reader :implement, :field, :response
amqp_attr_reader :code
end
-class AmqpCommand < AmqpElement
+class AmqpControl < AmqpAction
+ def initialize(xml,amqp) super; end
+end
+
+class AmqpCommand < AmqpAction
def initialize(xml,amqp) super; end
- amqp_child_reader :implement, :field, :exception, :response
+ amqp_child_reader :exception
amqp_single_child_reader :result, :segments
- amqp_attr_reader :code
end
class AmqpClass < AmqpElement
@@ -296,12 +338,19 @@ class AmqpClass < AmqpElement
def l4?() # preview
!["connection", "session", "execution"].include?(name)
end
+
+ def actions() controls+commands; end
end
class AmqpType < AmqpElement
+ def initialize(xml,amqp) super; end
amqp_attr_reader :code, :fixed_width, :variable_width
end
+class AmqpXref < AmqpElement
+ def initialize(xml,amqp) super; end
+end
+
# AMQP root element.
class AmqpRoot < AmqpElement
amqp_attr_reader :major, :minor, :port, :comment
@@ -314,14 +363,16 @@ class AmqpRoot < AmqpElement
raise "No XML spec files." if specs.empty?
xml=parse(specs.shift)
specs.each { |s| xml_merge(xml, parse(s)) }
+ @used_by=Hash.new{ |h,k| h[k]=[] }
super(xml, nil)
end
+ attr_reader :used_by
+
+ def merge(root) xml_merge(xml, root.xml); end
+
def version() major + "-" + minor; end
- # Find a child node from a dotted amqp name, e.g. message.transfer
- def lookup(dotted_name) elements[dotted_name.gsub(/\./,"/")]; end
-
# preview - only struct child reader remains for new mapping
def domain_structs() domains.map{ |d| d.struct }.compact; end
def result_structs()
@@ -337,6 +388,8 @@ class AmqpRoot < AmqpElement
@methods_on[chassis] ||= classes.map { |c| c.methods_on(chassis) }.flatten
end
+ def fqname() nil; end
+
# TODO aconway 2008-02-21: methods by role.
private
@@ -353,7 +406,7 @@ end
# Collect information about generated files.
class GenFiles
@@files =[]
- def GenFiles.add(f) @@files << f; puts f; end
+ def GenFiles.add(f) @@files << f; end
def GenFiles.get() @@files; end
end
@@ -366,26 +419,38 @@ class Generator
def initialize (outdir, amqp)
@amqp=amqp
@outdir=outdir
- @prefix='' # For indentation or comments.
+ @prefix=[''] # For indentation or comments.
@indentstr=' ' # One indent level.
@outdent=2
- Pathname.new(@outdir).mkpath unless @outdir=="-" or File.directory?(@outdir)
+ Pathname.new(@outdir).mkpath unless @outdir=="-"
end
# Create a new file, set @out.
- def file(file)
+ def file(file, &block)
GenFiles.add file
if (@outdir != "-")
- path=Pathname.new "#{@outdir}/#{file}"
- path.parent.mkpath
- path.open('w') { |@out| yield }
+ @path=Pathname.new "#{@outdir}/#{file}"
+ @path.parent.mkpath
+ @out=String.new # Generate in memory first
+ if block then yield; endfile; end
+ end
+ end
+
+ def endfile()
+ if @outdir != "-"
+ if @path.exist? and @path.read == @out
+ puts "Skipped #{@path} - unchanged" # Dont generate if unchanged
+ else
+ @path.open('w') { |f| f << @out }
+ puts "Generated #{@path}"
+ end
end
end
# Append multi-line string to generated code, prefixing each line.
def gen (str)
str.each_line { |line|
- @out << @prefix unless @midline
+ @out << @prefix.last unless @midline
@out << line
@midline = nil
}
@@ -400,23 +465,25 @@ class Generator
end
# Generate code with added prefix.
- def prefix(add)
- save=@prefix
- @prefix+=add
- yield
- @prefix=save
+ def prefix(add, &block)
+ @prefix.push @prefix.last+add
+ if block then yield; endprefix; end
+ end
+
+ def endprefix()
+ @prefix.pop
end
# Generate indented code
def indent(n=1,&block) prefix(@indentstr * n,&block); end
+ alias :endindent :endprefix
# Generate outdented code
def outdent(&block)
- save=@prefix
- @prefix=@prefix[0...-2]
- yield
- @prefix=save
+ @prefix.push @prefix.last[0...-2]
+ if block then yield; endprefix; end
end
+ alias :endoutdent :endprefix
attr_accessor :out
end
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb
index 60a653e18d..edee72e0bd 100755
--- a/qpid/cpp/rubygen/cppgen.rb
+++ b/qpid/cpp/rubygen/cppgen.rb
@@ -54,12 +54,31 @@ CppKeywords = Set.new(["and", "and_eq", "asm", "auto", "bitand",
CppMangle = CppKeywords+Set.new(["string"])
class String
- def cppsafe()
- CppMangle.include?(self) ? self+"_" : self
+ def cppsafe() CppMangle.include?(self) ? self+"_" : self; end
+
+ def amqp2cpp()
+ path=split(".")
+ name=path.pop
+ return name.typename if path.empty?
+ path.map! { |n| n.nsname }
+ return (path << name.caps).join("::")
end
+
+ alias :typename :caps
+ alias :nsname :bars
+ alias :constname :shout
+ alias :funcname :lcaps
+ alias :varname :lcaps
end
# Hold information about a C++ type.
+#
+# preview - new mapping does not use CppType,
+# Each amqp type corresponds exactly by dotted name
+# to a type, domain or struct, which in turns
+# corresponds by name to a C++ type or typedef.
+# (see String.amqp2cpp)
+#
class CppType
def initialize(name) @name=@param=@ret=name; end
attr_reader :name, :param, :ret, :code
@@ -93,11 +112,22 @@ class CppType
def to_s() name; end;
end
+class AmqpElement
+ def cppfqname()
+ names=parent.dotted_name.split(".")
+ # Field children are moved up to method in C++b
+ prefix.pop if parent.is_a? AmqpField
+ prefix.push cppname
+ prefix.join("::")
+ end
+end
+
class AmqpField
def cppname() name.lcaps.cppsafe; end
def cpptype() domain.cpptype; end
def bit?() domain.type_ == "bit"; end
def signature() cpptype.param+" "+cppname; end
+ def paramtype() "call_traits<#{type_.amqp2cpp}>::param_type"; end
end
class AmqpMethod
@@ -107,8 +137,41 @@ class AmqpMethod
def body_name() parent.name.caps+name.caps+"Body"; end
end
+module AmqpHasFields
+ def parameters()
+ fields.map { |f| "#{f.paramtype} #{f.cppname}_"}.join(",\n")
+ end
+
+ def arguments()
+ fields.map { |f| "#{f.cppname}_"}.join(",\n")
+ end
+
+ def values()
+ fields.map { |f| "#{f.cppname}"}.join(",\n")
+ end
+
+ def initializers()
+ fields.map { |f| "#{f.cppname}(#{f.cppname}_)"}.join(",\n")
+ end
+end
+
+class AmqpAction
+ def classname() name.typename; end
+ def funcname() parent.name.funcname + name.caps; end
+ include AmqpHasFields
+end
+
+class AmqpCommand < AmqpAction
+ def base() "Command"; end
+end
+
+class AmqpControl < AmqpAction
+ def base() "Control"; end
+end
+
class AmqpClass
- def cppname() name.caps; end
+ def cppname() name.caps; end # preview
+ def nsname() name.nsname; end
end
class AmqpDomain
@@ -150,18 +213,26 @@ class AmqpResult
end
class AmqpStruct
- def cpp_pack_type() AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t"); end
- def cpptype() parent.cpptype; end
- def cppname() cpptype.name; end
+ include AmqpHasFields
+
+ def cpp_pack_type() # preview
+ AmqpDomain.lookup_type(pack()) or CppType.new("uint16_t");
+ end
+ def cpptype() parent.cpptype; end # preview
+ def cppname() cpptype.name; end # preview
+
+ def classname() name.typename; end
end
class CppGen < Generator
def initialize(outdir, *specs)
super(outdir,*specs)
+ # need to sort classes for dependencies
+ @actions=[] # Stack of end-scope actions
end
# Write a header file.
- def h_file(path)
+ def h_file(path, &block)
path = (/\.h$/ === path ? path : path+".h")
guard=path.upcase.tr('./-','_')
file(path) {
@@ -169,12 +240,12 @@ class CppGen < Generator
gen "#define #{guard}\n"
gen Copyright
yield
- gen "#endif /*!#{guard}*/\n"
+ gen "#endif /*!#{guard}*/\n"
}
end
# Write a .cpp file.
- def cpp_file(path)
+ def cpp_file(path, &block)
path = (/\.cpp$/ === path ? path : path+".cpp")
file(path) do
gen Copyright
@@ -188,10 +259,12 @@ class CppGen < Generator
genl "#include #{header}"
end
- def scope(open="{",close="}", &block)
- genl open; indent(&block); genl close
+ def scope(open="{",close="}", &block)
+ genl open
+ indent &block
+ genl close
end
-
+
def namespace(name, &block)
genl
names = name.split("::")
@@ -210,7 +283,7 @@ class CppGen < Generator
indent { gen "#{bases.join(",\n")}" }
end
genl
- scope("{","};") { yield }
+ scope("{","};", &block)
end
def struct(name, *bases, &block)
@@ -242,6 +315,11 @@ class CppGen < Generator
prefix(" * ",&block)
genl " */"
end
+
+ # Generate code in namespace for each class
+ def each_class_ns()
+ @amqp.classes.each { |c| namespace(c.nsname) { yield c } }
+ end
end
# Fully-qualified class name
diff --git a/qpid/cpp/rubygen/generate b/qpid/cpp/rubygen/generate
index 94b194aaa4..d094be4f41 100755
--- a/qpid/cpp/rubygen/generate
+++ b/qpid/cpp/rubygen/generate
@@ -1,5 +1,6 @@
#!/usr/bin/env ruby
require 'amqpgen'
+require 'pathname'
#
# Run a set of code generation templates.
@@ -18,17 +19,39 @@ EOS
exit 1
end
-Outdir=ARGV[0]
-Specs=ARGV.grep(/\.xml$/)
-Amqp=AmqpRoot.new(*Specs)
+# Create array of specs by version
+def parse_specs(specs)
+ roots={ }
+ specs.each { |spec|
+ root=AmqpRoot.new(spec)
+ ver=root.version
+ if (roots[ver])
+ roots[ver].merge(root)
+ else
+ roots[ver]=root
+ end
+ }
+ roots
+end
# Run selected templates
if ARGV.any? { |arg| arg=="all" }
- templates=Dir["#{File.dirname __FILE__}/templates/*.rb"]
+ templates=Dir["#{File.dirname __FILE__}/*/*.rb"]
else
templates=ARGV.grep(/\.rb$/)
end
-templates.each { |t| load t }
+
+$outdir=ARGV[0]
+$models=parse_specs(ARGV.grep(/\.xml$/))
+templates.each { |t|
+ ver=Pathname.new(t).dirname.basename.to_s
+ $amqp=$models[ver]
+ if $amqp
+ load t
+ else
+ puts "Warning: skipping #{t}, no spec file for version #{ver}."
+ end
+}
def make_continue(lines) lines.join(" \\\n "); end
@@ -40,7 +63,7 @@ if makefile
generator_files=Dir["**/*.rb"] << File.basename(__FILE__)
Dir.chdir dir
rgen_generator=generator_files.map{ |f| "$(rgen_dir)/#{f}" }
- rgen_srcs=GenFiles.get.map{ |f| "#{Outdir}/#{f}" }
+ rgen_srcs=GenFiles.get.map{ |f| "#{$outdir}/#{f}" }
File.open(makefile, 'w') { |out|
out << <<EOS
@@ -51,13 +74,13 @@ rgen_generator=#{make_continue rgen_generator}
rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))}
-rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))}
+rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r{qpid/(framing|amqp_.+)/.+\.cpp$}))}
rgen_srcs=#{make_continue rgen_srcs}
# Header file install rules.
EOS
- ["framing", "client/no_keyword","client", "broker"].each { |ns|
+ ["amqp_0_10", "framing", "client/no_keyword","client", "broker"].each { |ns|
dir="qpid/#{ns}"
dir_ = dir.tr("/", "_")
regex=%r|#{dir}/[^/]+\.h$|
@@ -69,7 +92,7 @@ EOS
}
out << <<EOS
if GENERATE
-$(rgen_srcs) $(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs)
+$(srcdir)/#{File.basename makefile}: $(rgen_generator) $(specs)
$(rgen_cmd)
# Empty rule in case a generator file is renamed/removed.
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index b44aaa7e5e..e3b95e045f 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -11,11 +11,13 @@ EXTRA_DIST= $(platform_dist)
# This phony target is needed by generated makefile fragments:
force:
-# AMQP_XML is defined in ../configure.ac
-specs=@AMQP_XML@ $(top_srcdir)/xml/cluster.xml
-
if GENERATE
+# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
+amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml
+amqp_0_10_xml=@AMQP_FINAL_XML@
+specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
+
# Ruby generator.
rgen_dir=$(top_srcdir)/rubygen
rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)/generate $(srcdir)/gen $(specs) all $(srcdir)/rubygen.mk
@@ -101,6 +103,7 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(rgen_common_cpp) \
$(platform_src) \
+ qpid/amqp_0_10/helpers.cpp \
qpid/Serializer.h \
qpid/amqp_0_10/built_in_types.h \
qpid/amqp_0_10/Codec.h \
@@ -166,6 +169,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PreviewConnection.cpp \
qpid/broker/PreviewConnectionHandler.cpp \
qpid/broker/PreviewSessionHandler.cpp \
+ qpid/broker/PreviewSessionManager.cpp \
+ qpid/broker/PreviewSessionState.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -247,6 +252,7 @@ libqpidclient_la_SOURCES = \
nobase_include_HEADERS = \
$(platform_hdr) \
+ qpid/amqp_0_10/helpers.h \
qpid/assert.h \
qpid/DataDir.h \
qpid/Exception.h \
@@ -270,6 +276,8 @@ nobase_include_HEADERS = \
qpid/broker/PreviewConnection.h \
qpid/broker/PreviewConnectionHandler.h \
qpid/broker/PreviewSessionHandler.h \
+ qpid/broker/PreviewSessionManager.h \
+ qpid/broker/PreviewSessionState.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp
index 07f157cfc3..17f0d5029c 100644
--- a/qpid/cpp/src/qpid/Exception.cpp
+++ b/qpid/cpp/src/qpid/Exception.cpp
@@ -32,20 +32,31 @@ std::string strError(int err) {
return std::string(strerror_r(err, buf, sizeof(buf)));
}
-Exception::Exception(const std::string& s) throw() : msg(s) {
- QPID_LOG(warning, "Exception: " << msg);
+Exception::Exception(const std::string& msg,
+ const std::string& nm,
+ uint16_t cd) throw()
+ : message(msg), name(nm), code(cd),
+ whatStr((name.empty() ? "" : name + ": ")+ msg)
+{
+ QPID_LOG(warning, "Exception: " << whatStr);
}
Exception::~Exception() throw() {}
-std::string Exception::str() const throw() {
- if (msg.empty())
- const_cast<std::string&>(msg).assign(typeid(*this).name());
- return msg;
+std::string Exception::getMessage() const throw() { return message; }
+
+std::string Exception::getName() const throw() {
+ return name.empty() ? typeid(*this).name() : name;
}
-const char* Exception::what() const throw() { return str().c_str(); }
+uint16_t Exception::getCode() const throw() { return code; }
+
+const char* Exception::what() const throw() {
+ if (whatStr.empty()) return typeid(*this).name();
+ else return whatStr.c_str();
+}
-const std::string ClosedException::CLOSED_MESSAGE("Closed");
+ClosedException::ClosedException(const std::string& msg)
+ : Exception(msg, "ClosedException") {}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h
index 57c7a21234..00365018ba 100644
--- a/qpid/cpp/src/qpid/Exception.h
+++ b/qpid/cpp/src/qpid/Exception.h
@@ -40,13 +40,27 @@ std::string strError(int err);
class Exception : public std::exception
{
public:
- explicit Exception(const std::string& str=std::string()) throw();
+ explicit Exception(const std::string& message=std::string(),
+ const std::string& name=std::string(),
+ uint16_t code=0) throw();
+
virtual ~Exception() throw();
+
+ // returns "name: message"
+ virtual const char* what() const throw();
+
+ virtual std::string getName() const throw();
+ virtual std::string getMessage() const throw();
+ virtual uint16_t getCode() const throw();
+
+ // FIXME aconway 2008-02-21: backwards compat, remove?
+ std::string str() const throw() { return getMessage(); }
- virtual const char *what() const throw();
- virtual std::string str() const throw();
private:
- std::string msg;
+ const std::string message;
+ const std::string name;
+ const uint16_t code;
+ const std::string whatStr;
};
struct ChannelException : public Exception {
@@ -62,8 +76,7 @@ struct ConnectionException : public Exception {
};
struct ClosedException : public Exception {
- static const std::string CLOSED_MESSAGE;
- ClosedException(const std::string& msg=CLOSED_MESSAGE) : Exception(msg) {}
+ ClosedException(const std::string& msg=std::string());
};
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
index 6cd9c72367..445f07459c 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/built_in_types.h
@@ -29,6 +29,7 @@
#include <boost/array.hpp>
#include <stdint.h>
#include <string>
+#include <vector>
/**@file Mapping from built-in AMQP types to C++ types */
@@ -66,7 +67,7 @@ typedef double Double;
typedef float Float;
typedef framing::SequenceNumber SequenceNo;
using framing::Uuid;
-typedef sys::AbsTime DateTime;
+typedef sys::AbsTime Datetime;
typedef Decimal<Uint8, Int32> Dec32;
typedef Decimal<Uint8, Int64> Dec64;
@@ -89,14 +90,18 @@ typedef CodableString<Uint16, Uint16> Str16Utf16;
typedef CodableString<Uint8, Uint32> Vbin32;
-/** FIXME aconway 2008-02-20: todo
-byte-ranges 2 byte ranges within a 64-bit payload
-sequence-set 2 ranged set representation
-map 0xa8 4 a mapping of keys to typed values
-list 0xa9 4 a series of consecutive type-value pairs
-array 0xaa 4 a defined length collection of values of a single type
-struct32 0xab 4 a coded struct with a 32-bit size
-*/
+// FIXME aconway 2008-02-26: array encoding
+template <class T> struct Array : public std::vector<T> {};
+
+// FIXME aconway 2008-02-26: Unimplemented types:
+struct ByteRanges {};
+struct SequenceSet {};
+struct Map {};
+struct List {};
+struct Struct32 {};
+
+// Top level enum definitions.
+enum SegmentType { CONTROL, COMMAND, HEADER, BODY };
}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp
new file mode 100644
index 0000000000..4333a2cd92
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "helpers.h"
+
+namespace qpid {
+namespace amqp_0_10 {
+
+Control::~Control() {}
+Command::~Command() {}
+Struct::~Struct() {}
+
+}} // namespace qpid::amqp_0_10
diff --git a/qpid/cpp/src/qpid/amqp_0_10/helpers.h b/qpid/cpp/src/qpid/amqp_0_10/helpers.h
new file mode 100644
index 0000000000..1769d374d9
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/helpers.h
@@ -0,0 +1,67 @@
+#ifndef QPID_AMQP_0_10_HELPERS_H
+#define QPID_AMQP_0_10_HELPERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+n * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+
+namespace qpid {
+
+namespace amqp_0_10 {
+
+// Look up names by code
+const char* getClassName(uint8_t code);
+const char* getCommandName(uint8_t classCode, uint8_t code);
+const char* getControlName(uint8_t classCode, uint8_t code);
+const char* getStructName(uint8_t classCode, uint8_t code);
+
+struct Command {
+ virtual ~Command();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+struct Control {
+ virtual ~Control();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+struct Struct {
+ virtual ~Struct();
+ class Visitor;
+ virtual void accept(Visitor&) const = 0;
+};
+
+/** Base class for generated enum domains.
+ * Enums map to classes for type safety and to provide separate namespaces
+ * for clashing values.
+ */
+struct Enum {
+ int value;
+ Enum(int v=0) : value(v) {}
+ operator int() const { return value; }
+ template <class S> void serialize(S &s) { s(value); }
+};
+
+}} // namespace qpid::amqp_0_10
+
+#endif /*!QPID_AMQP_0_10_HELPERS_H*/
diff --git a/qpid/cpp/src/qpid/amqp_0_10/visitors.h b/qpid/cpp/src/qpid/amqp_0_10/visitors.h
new file mode 100644
index 0000000000..3835f37f3e
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp_0_10/visitors.h
@@ -0,0 +1,15 @@
+// Visitors
+template <class Base> struct Visitor;
+template <class Base, class F, class R> FunctorVisitor;
+
+/** Template base implementation for visitables. */
+template <class Base, class Derived>
+struct VisitableBase : public Base {
+ virtual void accept(Visitor<Derived>& v) {
+ v.visit(static_cast<Derived>&(*this));
+ }
+ virtual void accept(Visitor<Derived>& v) const {
+ v.visit(static_cast<const Derived>&(*this));
+ }
+};
+
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 0a0eb0a0df..9bfa868d9c 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -109,7 +109,8 @@ Broker::Broker(const Broker::Options& conf) :
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
- sessionManager(conf.ack)
+ sessionManager(conf.ack),
+ previewSessionManager(conf.ack)
{
// Early-Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 55bc7644a5..153eabc6b3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -30,6 +30,7 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
+#include "PreviewSessionManager.h"
#include "Vhost.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -109,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir& getDataDir() { return dataDir; }
SessionManager& getSessionManager() { return sessionManager; }
+ PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -136,6 +138,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
+ PreviewSessionManager previewSessionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
index c8c1e12f28..ef2c51bb8d 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
@@ -68,7 +68,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- framing::ProtocolVersion getVersion() const { return session.getVersion();}
+ framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
AccessHandler* getAccessHandler() {
diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h
index 410d400c9d..4c51e2a826 100644
--- a/qpid/cpp/src/qpid/broker/HandlerImpl.h
+++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h
@@ -20,7 +20,7 @@
*/
#include "SemanticState.h"
-#include "SessionState.h"
+#include "SessionContext.h"
#include "ConnectionState.h"
namespace qpid {
@@ -35,13 +35,13 @@ class Broker;
class HandlerImpl {
protected:
SemanticState& state;
- SessionState& session;
+ SessionContext& session;
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
ConnectionState& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
index 19e6a235c4..36092bb7f6 100644
--- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.cpp
@@ -19,7 +19,7 @@
*/
#include "PreviewSessionHandler.h"
-#include "SessionState.h"
+#include "PreviewSessionState.h"
#include "PreviewConnection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
@@ -36,7 +36,7 @@ using namespace std;
using namespace qpid::sys;
PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -106,15 +106,15 @@ void PreviewSessionHandler::assertClosed(const char* method) const {
void PreviewSessionHandler::open(uint32_t detachedLifetime) {
assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
peerSession.attached(session->getId(), session->getTimeout());
}
void PreviewSessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
+ session = connection.broker.getPreviewSessionManager().resume(id);
session->attach(*this);
SequenceNumber seq = session->resuming();
peerSession.attached(session->getId(), session->getTimeout());
@@ -154,7 +154,7 @@ void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText)
void PreviewSessionHandler::localSuspend() {
if (session.get() && session->isAttached()) {
session->detach();
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
}
@@ -171,7 +171,7 @@ void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
const SequenceNumberSet& /*seenFrameSet*/)
{
assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
+ if (session->getState() == PreviewSessionState::RESUMING) {
session->receivedAck(cumulativeSeenMark);
framing::SessionState::Replay replay=session->replay();
std::for_each(replay.begin(), replay.end(),
@@ -193,14 +193,14 @@ void PreviewSessionHandler::solicitAck() {
void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
+ std::auto_ptr<PreviewSessionState> state(
+ connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
}
void PreviewSessionHandler::detached()
{
- connection.broker.getSessionManager().suspend(session);
+ connection.broker.getPreviewSessionManager().suspend(session);
session.reset();
}
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
index e1096ebf9f..4c517367d7 100644
--- a/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionHandler.h
@@ -36,7 +36,7 @@ namespace qpid {
namespace broker {
class PreviewConnection;
-class SessionState;
+class PreviewSessionState;
/**
* A SessionHandler is associated with each active channel. It
@@ -45,7 +45,7 @@ class SessionState;
*/
class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -53,8 +53,8 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand
~PreviewSessionHandler();
/** Returns 0 if not attached to a session */
- SessionState* getSession() { return session.get(); }
- const SessionState* getSession() const { return session.get(); }
+ PreviewSessionState* getSession() { return session.get(); }
+ const PreviewSessionState* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel.get(); }
@@ -101,7 +101,7 @@ class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHand
framing::AMQP_ClientProxy proxy;
framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
- std::auto_ptr<SessionState> session;
+ std::auto_ptr<PreviewSessionState> session;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
new file mode 100644
index 0000000000..ec73082817
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.cpp
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "PreviewSessionManager.h"
+#include "PreviewSessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Helpers.h"
+#include "qpid/memory.h"
+
+#include <boost/bind.hpp>
+#include <boost/range.hpp>
+
+#include <algorithm>
+#include <functional>
+#include <ostream>
+
+namespace qpid {
+namespace broker {
+
+using namespace sys;
+using namespace framing;
+
+PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
+
+PreviewSessionManager::~PreviewSessionManager() {}
+
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::open(
+ PreviewSessionHandler& h, uint32_t timeout_)
+{
+ Mutex::ScopedLock l(lock);
+ std::auto_ptr<PreviewSessionState> session(
+ new PreviewSessionState(this, &h, timeout_, ack));
+ active.insert(session->getId());
+ for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::opened, _1,boost::ref(*session)));
+ return session;
+}
+
+void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
+ Mutex::ScopedLock l(lock);
+ active.erase(session->getId());
+ session->suspend();
+ session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
+ if (session->mgmtObject.get() != 0)
+ session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
+ suspended.push_back(session.release()); // In expiry order
+ eraseExpired();
+}
+
+std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ eraseExpired();
+ if (active.find(id) != active.end())
+ throw SessionBusyException(
+ QPID_MSG("Session already active: " << id));
+ Suspended::iterator i = std::find_if(
+ suspended.begin(), suspended.end(),
+ boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
+ );
+ if (i == suspended.end())
+ throw InvalidArgumentException(
+ QPID_MSG("No suspended session with id=" << id));
+ active.insert(id);
+ std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
+ return state;
+}
+
+void PreviewSessionManager::erase(const framing::Uuid& id)
+{
+ Mutex::ScopedLock l(lock);
+ active.erase(id);
+}
+
+void PreviewSessionManager::eraseExpired() {
+ // Called with lock held.
+ if (!suspended.empty()) {
+ Suspended::iterator keep = std::lower_bound(
+ suspended.begin(), suspended.end(), now(),
+ boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
+ }
+}
+
+void PreviewSessionManager::add(const intrusive_ptr<Observer>& o) {
+ observers.push_back(o);
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionManager.h b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
new file mode 100644
index 0000000000..65ca49ec89
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionManager.h
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
+#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/framing/Uuid.h>
+#include <qpid/sys/Time.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
+
+#include <boost/noncopyable.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include <set>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+class PreviewSessionState;
+class PreviewSessionHandler;
+
+/**
+ * Create and manage PreviewSessionState objects.
+ */
+class PreviewSessionManager : private boost::noncopyable {
+ public:
+ /**
+ * Observer notified of PreviewSessionManager events.
+ */
+ struct Observer : public RefCounted {
+ virtual void opened(PreviewSessionState&) {}
+ };
+
+ PreviewSessionManager(uint32_t ack);
+
+ ~PreviewSessionManager();
+
+ /** Open a new active session, caller takes ownership */
+ std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
+
+ /** Suspend a session, start it's timeout counter.
+ * The factory takes ownership.
+ */
+ void suspend(std::auto_ptr<PreviewSessionState> session);
+
+ /** Resume a suspended session.
+ *@throw Exception if timed out or non-existant.
+ */
+ std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
+
+ /** Add an Observer. */
+ void add(const intrusive_ptr<Observer>&);
+
+ private:
+ typedef boost::ptr_vector<PreviewSessionState> Suspended;
+ typedef std::set<framing::Uuid> Active;
+ typedef std::vector<intrusive_ptr<Observer> > Observers;
+
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
+ sys::Mutex lock;
+ Suspended suspended;
+ Active active;
+ uint32_t ack;
+ Observers observers;
+
+ friend class PreviewSessionState; // removes deleted sessions from active set.
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+
+
+#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
new file mode 100644
index 0000000000..7188ffbf40
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
@@ -0,0 +1,169 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewSessionState.h"
+#include "PreviewSessionManager.h"
+#include "PreviewSessionHandler.h"
+#include "ConnectionState.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+using sys::Mutex;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+
+PreviewSessionState::PreviewSessionState(
+ PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack, timeout_ > 0),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
+ getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+
+ Manageable* parent = broker.GetVhostObject ();
+
+ if (parent != 0)
+ {
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+
+ if (agent.get () != 0)
+ {
+ mgmtObject = management::Session::shared_ptr
+ (new management::Session (this, parent, id.str ()));
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
+ mgmtObject->set_detachedLifespan (getTimeout());
+ agent->addObject (mgmtObject);
+ }
+ }
+}
+
+PreviewSessionState::~PreviewSessionState() {
+ // Remove ID from active session list.
+ if (factory)
+ factory->erase(getId());
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+PreviewSessionHandler* PreviewSessionState::getHandler() {
+ return handler;
+}
+
+AMQP_ClientProxy& PreviewSessionState::getProxy() {
+ assert(isAttached());
+ return getHandler()->getProxy();
+}
+
+ConnectionState& PreviewSessionState::getConnection() {
+ assert(isAttached());
+ return getHandler()->getConnection();
+}
+
+void PreviewSessionState::detach() {
+ getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ Mutex::ScopedLock l(lock);
+ handler = 0; out.next = 0;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (0);
+ }
+}
+
+void PreviewSessionState::attach(PreviewSessionHandler& h) {
+ {
+ Mutex::ScopedLock l(lock);
+ handler = &h;
+ out.next = &handler->out;
+ if (mgmtObject.get() != 0)
+ {
+ mgmtObject->set_attached (1);
+ mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h.getChannel());
+ }
+ }
+ h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+}
+
+void PreviewSessionState::activateOutput()
+{
+ Mutex::ScopedLock l(lock);
+ if (isAttached()) {
+ getConnection().outputTasks.activateOutput();
+ }
+}
+ //This class could be used as the callback for queue notifications
+ //if not attached, it can simply ignore the callback, else pass it
+ //on to the connection
+
+ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
+ Args& /*args*/)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ switch (methodId)
+ {
+ case management::Session::METHOD_DETACH :
+ if (handler != 0)
+ {
+ handler->detach();
+ }
+ status = Manageable::STATUS_OK;
+ break;
+
+ case management::Session::METHOD_CLOSE :
+ /*
+ if (handler != 0)
+ {
+ handler->getConnection().closeChannel(handler->getChannel());
+ }
+ status = Manageable::STATUS_OK;
+ break;
+ */
+
+ case management::Session::METHOD_SOLICITACK :
+ case management::Session::METHOD_RESETLIFESPAN :
+ status = Manageable::STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ return status;
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PreviewSessionState.h b/qpid/cpp/src/qpid/broker/PreviewSessionState.h
new file mode 100644
index 0000000000..6e8523317c
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PreviewSessionState.h
@@ -0,0 +1,124 @@
+#ifndef QPID_BROKER_PREVIEWSESSION_H
+#define QPID_BROKER_PREVIEWSESSION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Time.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/management/Session.h"
+#include "SessionContext.h"
+
+#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <set>
+#include <vector>
+#include <ostream>
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class SemanticHandler;
+class PreviewSessionHandler;
+class PreviewSessionManager;
+class Broker;
+class ConnectionState;
+
+/**
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
+ */
+class PreviewSessionState : public framing::SessionState,
+ public SessionContext,
+ public framing::FrameHandler::Chains,
+ public management::Manageable
+{
+ public:
+ ~PreviewSessionState();
+ bool isAttached() { return handler; }
+
+ void detach();
+ void attach(PreviewSessionHandler& handler);
+
+
+ PreviewSessionHandler* getHandler();
+
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ ConnectionState& getConnection();
+
+ uint32_t getTimeout() const { return timeout; }
+ Broker& getBroker() { return broker; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ /** OutputControl **/
+ void activateOutput();
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args);
+
+ // Normally SessionManager creates sessions.
+ PreviewSessionState(PreviewSessionManager*,
+ PreviewSessionHandler* out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
+
+ private:
+ PreviewSessionManager* factory;
+ PreviewSessionHandler* handler;
+ framing::Uuid id;
+ uint32_t timeout;
+ sys::AbsTime expiry; // Used by SessionManager.
+ Broker& broker;
+ framing::ProtocolVersion version;
+ sys::Mutex lock;
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
+ management::Session::shared_ptr mgmtObject;
+
+ friend class PreviewSessionManager;
+};
+
+
+inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
+ return out << session.getId();
+}
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
index 2a79496144..fdde7ec18c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -22,7 +22,6 @@
#include "SemanticHandler.h"
#include "SemanticState.h"
#include "SessionContext.h"
-#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "qpid/framing/ExecutionCompleteBody.h"
@@ -37,9 +36,9 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(SessionState& s) :
+SemanticHandler::SemanticHandler(SessionContext& s) :
state(*this,s), session(s),
- msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
+ msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
{}
@@ -164,13 +163,8 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
- SessionContext* handler = session.getHandler();
- if (handler) {
- uint32_t maxFrameSize = handler->getConnection().getFrameMax();
- MessageDelivery::deliver(msg, handler->out, ++outgoing.hwm, token, maxFrameSize);
- } else {
- QPID_LOG(error, "Dropping message as session is no longer attached to a channel.");
- }
+ uint32_t maxFrameSize = session.getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
return outgoing.hwm;
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h
index d7f3ec8799..893a0cbded 100644
--- a/qpid/cpp/src/qpid/broker/SemanticHandler.h
+++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h
@@ -46,7 +46,7 @@ class AMQHeaderBody;
namespace broker {
-class SessionState;
+class SessionContext;
class SemanticHandler : public DeliveryAdapter,
public framing::FrameHandler,
@@ -56,7 +56,7 @@ class SemanticHandler : public DeliveryAdapter,
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
SemanticState state;
- SessionState& session;
+ SessionContext& session;
// TODO aconway 2007-09-20: Why are these on the handler rather than the
// state?
IncomingExecutionContext incoming;
@@ -78,10 +78,10 @@ class SemanticHandler : public DeliveryAdapter,
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
//Connection& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getBroker(); }
+ Broker& getBroker() { return session.getConnection().getBroker(); }
public:
- SemanticHandler(SessionState& session);
+ SemanticHandler(SessionContext& session);
//frame handler:
void handle(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 7b4035604f..9b44f31e14 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -19,7 +19,7 @@
*
*/
-#include "SessionState.h"
+#include "SessionContext.h"
#include "BrokerAdapter.h"
#include "Queue.h"
#include "Connection.h"
@@ -56,7 +56,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace qpid::ptr_map;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
deliveryAdapter(da),
prefetchSize(0),
@@ -263,21 +263,16 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (parent->getSession().isAttached() && accept(msg.payload)) {
- allocateCredit(msg.payload);
- DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg, token);
- if (windowing || ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
- }
- if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
- }
- return true;
- } else {
- QPID_LOG(debug, "Failed to deliver message to '" << name << "' on " << parent);
- return false;
+ allocateCredit(msg.payload);
+ DeliveryId deliveryTag =
+ parent->deliveryAdapter.deliver(msg, token);
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
+ }
+ if (acquire && !ackExpected) {
+ queue->dequeue(0, msg.payload);
}
+ return true;
}
bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
@@ -331,7 +326,7 @@ void SemanticState::cancel(ConsumerImpl& c)
if(queue) {
queue->cancel(c);
if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
- Queue::tryAutoDelete(getSession().getBroker(), queue);
+ Queue::tryAutoDelete(session.getBroker(), queue);
}
}
}
@@ -352,7 +347,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
+ cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 7fc6e4167c..cc9c0e1e9b 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -45,7 +45,7 @@
namespace qpid {
namespace broker {
-class SessionState;
+class SessionContext;
/**
* SemanticState holds the L3 and L4 state of an open session, whether
@@ -98,7 +98,7 @@ class SemanticState : public framing::FrameHandler::Chains,
typedef boost::ptr_map<std::string,ConsumerImpl> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
- SessionState& session;
+ SessionContext& session;
DeliveryAdapter& deliveryAdapter;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
@@ -129,10 +129,10 @@ class SemanticState : public framing::FrameHandler::Chains,
void cancel(ConsumerImpl&);
public:
- SemanticState(DeliveryAdapter&, SessionState&);
+ SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
- SessionState& getSession() { return session; }
+ SessionContext& getSession() { return session; }
/**
* Get named queue, never returns 0.
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index a27b43cf65..a289310b15 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -25,6 +25,7 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
@@ -33,17 +34,13 @@
namespace qpid {
namespace broker {
-class SessionContext : public framing::FrameHandler::InOutHandler
+class SessionContext : public sys::OutputControl
{
public:
- SessionContext(qpid::framing::OutputHandler& out) : InOutHandler(0, &out) {}
virtual ~SessionContext(){}
virtual ConnectionState& getConnection() = 0;
- virtual const ConnectionState& getConnection() const = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
- virtual const framing::AMQP_ClientProxy& getProxy() const = 0;
- virtual void detach() = 0;
- virtual framing::ChannelId getChannel() const = 0;
+ virtual Broker& getBroker() = 0;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 1cb10d0c19..0e3c9928d1 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -21,6 +21,7 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
+#include "ConnectionState.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/ClientInvoker.h"
@@ -36,7 +37,7 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : SessionContext(c.getOutput()),
+ : InOutHandler(0, &out),
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
@@ -58,18 +59,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
//
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
+ if (!ignoring) {
+ if (m &&
+ (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
+ invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ return;
+ } else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+ return;
+ } else {
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << channel.get() << " is not open"));
+ }
}
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
@@ -91,10 +96,12 @@ void SessionHandler::handleOut(AMQFrame& f) {
}
void SessionHandler::assertAttached(const char* method) const {
- if (!session.get())
+ if (!session.get()) {
+ std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
+ }
}
void SessionHandler::assertClosed(const char* method) const {
@@ -208,4 +215,32 @@ void SessionHandler::detached()
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
+void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ assertAttached("complete");
+ session->complete(cumulative, range);
+}
+
+void SessionHandler::flush()
+{
+ assertAttached("flush");
+ session->flush();
+}
+void SessionHandler::sync()
+{
+ assertAttached("sync");
+ session->sync();
+}
+
+void SessionHandler::noop()
+{
+ assertAttached("noop");
+ session->noop();
+}
+
+void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+{
+ //never actually sent by client at present
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 5a72bfb12d..e6bc463a82 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -28,7 +28,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/noncopyable.hpp>
@@ -36,16 +36,18 @@ namespace qpid {
namespace broker {
class Connection;
+class ConnectionState;
class SessionState;
/**
* A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
+ * receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::AMQP_ClientOperations::SessionHandler,
- public SessionContext,
+ public framing::AMQP_ServerOperations::ExecutionHandler,
+ public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
public:
@@ -90,12 +92,17 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
void detached();
+ //Execution methods:
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void result(uint32_t command, const std::string& data);
+ void sync();
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
-
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp
index aa7ac9a8bb..571d3365db 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp
@@ -45,7 +45,7 @@ SessionManager::~SessionManager() {}
// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
- SessionContext& h, uint32_t timeout_)
+ SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
diff --git a/qpid/cpp/src/qpid/broker/SessionManager.h b/qpid/cpp/src/qpid/broker/SessionManager.h
index 94956a83ed..7e8bd18f57 100644
--- a/qpid/cpp/src/qpid/broker/SessionManager.h
+++ b/qpid/cpp/src/qpid/broker/SessionManager.h
@@ -38,7 +38,7 @@ namespace qpid {
namespace broker {
class SessionState;
-class SessionContext;
+class SessionHandler;
/**
* Create and manage SessionState objects.
@@ -57,7 +57,7 @@ class SessionManager : private boost::noncopyable {
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionContext& c, uint32_t timeout_);
+ std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_);
/** Suspend a session, start it's timeout counter.
* The factory takes ownership.
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index b6c59cfb3b..573a567da6 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -19,12 +19,16 @@
*
*/
#include "SessionState.h"
-#include "SessionManager.h"
-#include "SessionContext.h"
-#include "ConnectionState.h"
#include "Broker.h"
+#include "ConnectionState.h"
+#include "MessageDelivery.h"
#include "SemanticHandler.h"
+#include "SessionManager.h"
+#include "SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/ServerInvoker.h"
+
+#include <boost/bind.hpp>
namespace qpid {
namespace broker {
@@ -37,17 +41,17 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionContext* h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
+ semanticState(*this, *this),
+ adapter(semanticState),
+ msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
+ ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.addOutputTask(&semanticState);
Manageable* parent = broker.GetVhostObject ();
@@ -76,7 +80,7 @@ SessionState::~SessionState() {
mgmtObject->resourceDestroy ();
}
-SessionContext* SessionState::getHandler() {
+SessionHandler* SessionState::getHandler() {
return handler;
}
@@ -91,20 +95,19 @@ ConnectionState& SessionState::getConnection() {
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
+ getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
+ handler = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
}
}
-void SessionState::attach(SessionContext& h) {
+void SessionState::attach(SessionHandler& h) {
{
Mutex::ScopedLock l(lock);
handler = &h;
- out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
@@ -112,7 +115,7 @@ void SessionState::attach(SessionContext& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
+ h.getConnection().outputTasks.addOutputTask(&semanticState);
}
void SessionState::activateOutput()
@@ -165,5 +168,100 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
+void SessionState::handleCommand(framing::AMQMethodBody* method)
+{
+ SequenceNumber id = incoming.next();
+ Invoker::Result invocation = invoke(adapter, *method);
+ incoming.complete(id);
+
+ if (!invocation.wasHandled()) {
+ throw NotImplementedException("Not implemented");
+ } else if (invocation.hasResult()) {
+ getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ }
+ if (method->isSync()) {
+ incoming.sync(id);
+ sendCompletion();
+ }
+ //TODO: if window gets too large send unsolicited completion
+}
+
+void SessionState::handleContent(AMQFrame& frame)
+{
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ if (!msg) {//start of frameset will be indicated by frame flags
+ msgBuilder.start(incoming.next());
+ msg = msgBuilder.getMessage();
+ }
+ msgBuilder.handle(frame);
+ if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ msg->setPublisher(&getConnection());
+ semanticState.handle(msg);
+ msgBuilder.end();
+ incoming.track(msg);
+ if (msg->getFrames().getMethod()->isSync()) {
+ incoming.sync(msg->getCommandId());
+ sendCompletion();
+ }
+ }
+}
+
+void SessionState::handle(AMQFrame& frame)
+{
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assmblies are non-content bearing and all content-bearing
+ //assmeblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod());
+ } else {
+ handleContent(frame);
+ }
+
+}
+
+DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
+{
+ uint32_t maxFrameSize = getConnection().getFrameMax();
+ MessageDelivery::deliver(msg, getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
+ return outgoing.hwm;
+}
+
+void SessionState::sendCompletion()
+{
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
+ getProxy().getExecution().complete(mark.getValue(), range);
+}
+
+void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
+{
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ semanticState.ackCumulative(mark.getValue());
+ }
+ range.processRanges(ackOp);
+}
+
+void SessionState::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
+
+void SessionState::sync()
+{
+ incoming.sync();
+ sendCompletion();
+}
+
+void SessionState::noop()
+{
+ incoming.noop();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 8a12e580b7..98c21a8ab5 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -27,10 +27,15 @@
#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/OutputControl.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
+#include "BrokerAdapter.h"
+#include "DeliveryAdapter.h"
+#include "MessageBuilder.h"
+#include "SessionContext.h"
+#include "SemanticState.h"
+#include "IncomingExecutionContext.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -47,8 +52,7 @@ class AMQP_ClientProxy;
namespace broker {
-class SemanticHandler;
-class SessionContext;
+class SessionHandler;
class SessionManager;
class Broker;
class ConnectionState;
@@ -58,8 +62,8 @@ class ConnectionState;
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::Chains,
- public sys::OutputControl,
+ public SessionContext,
+ public DeliveryAdapter,
public management::Manageable
{
public:
@@ -67,10 +71,10 @@ class SessionState : public framing::SessionState,
bool isAttached() { return handler; }
void detach();
- void attach(SessionContext& handler);
+ void attach(SessionHandler& handler);
- SessionContext* getHandler();
+ SessionHandler* getHandler();
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
@@ -85,6 +89,19 @@ class SessionState : public framing::SessionState,
/** OutputControl **/
void activateOutput();
+ void handle(framing::AMQFrame& frame);
+ void handleCommand(framing::AMQMethodBody* method);
+ void handleContent(framing::AMQFrame& frame);
+
+ void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
+ void flush();
+ void noop();
+ void sync();
+ void sendCompletion();
+
+ //delivery adapter methods:
+ DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
+
// Manageable entry points
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
@@ -92,21 +109,32 @@ class SessionState : public framing::SessionState,
// Normally SessionManager creates sessions.
SessionState(SessionManager*,
- SessionContext* out,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
private:
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+
SessionManager* factory;
- SessionContext* handler;
+ SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
framing::ProtocolVersion version;
sys::Mutex lock;
- boost::scoped_ptr<SemanticHandler> semanticHandler;
+
+ SemanticState semanticState;
+ BrokerAdapter adapter;
+ MessageBuilder msgBuilder;
+
+ //execution state
+ IncomingExecutionContext incoming;
+ framing::Window outgoing;
+ RangedOperation ackOp;
+
management::Session::shared_ptr mgmtObject;
friend class SessionManager;
diff --git a/qpid/cpp/src/qpid/client/Session.h b/qpid/cpp/src/qpid/client/Session.h
index 3293af60fe..5d91f289e2 100644
--- a/qpid/cpp/src/qpid/client/Session.h
+++ b/qpid/cpp/src/qpid/client/Session.h
@@ -27,7 +27,7 @@ namespace qpid {
namespace client {
/**
- * Session is currently just an alias for Session_0_10
+ * Session is currently just an alias for Session_99_0
*
* \ingroup clientapi
*/
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index bca6c49c13..5152aa2e43 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,7 @@
*/
#include "Cluster.h"
-#include "qpid/broker/SessionState.h"
+#include "qpid/broker/PreviewSessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::PreviewSessionState;
namespace {
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ PreviewSessionState& session;
Cluster& cluster;
bool busy;
Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+ ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
void handle(AMQFrame& f) {
Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) {
c.next = h;
}
-struct SessionObserver : public broker::SessionManager::Observer {
+struct SessionObserver : public broker::PreviewSessionManager::Observer {
Cluster& cluster;
SessionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void opened(PreviewSessionState& s) {
// FIXME aconway 2008-01-29: IList for memory management.
ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index b62b2be5f1..733db8003d 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -62,7 +62,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -116,7 +116,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- intrusive_ptr<broker::SessionManager::Observer> observer;
+ intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
index ceafa389b0..0ea3953175 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -69,7 +69,7 @@ struct ClusterPlugin : public Plugin {
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- broker->getSessionManager().add(cluster->getObserver());
+ broker->getPreviewSessionManager().add(cluster->getObserver());
}
}
};
diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
index 1be8856c13..b15e14d6f6 100644
--- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
+++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace framing {
static ProtocolVersion highestProtocolVersion(99, 0);
+//static ProtocolVersion highestProtocolVersion(0, 10);
} /* namespace framing */
} /* namespace qpid */
diff --git a/qpid/cpp/src/qpid/framing/Proxy.h b/qpid/cpp/src/qpid/framing/Proxy.h
index b6ac897e96..86b99a83b0 100644
--- a/qpid/cpp/src/qpid/framing/Proxy.h
+++ b/qpid/cpp/src/qpid/framing/Proxy.h
@@ -39,6 +39,7 @@ class Proxy
void send(const AMQBody&);
ProtocolVersion getVersion() const;
+ FrameHandler& getHandler() { return out; }
protected:
FrameHandler& out;
diff --git a/qpid/cpp/src/tests/exception_test.cpp b/qpid/cpp/src/tests/exception_test.cpp
index 700aeef47c..715cdaec2a 100644
--- a/qpid/cpp/src/tests/exception_test.cpp
+++ b/qpid/cpp/src/tests/exception_test.cpp
@@ -92,7 +92,7 @@ BOOST_FIXTURE_TEST_CASE(DisconnectedListen, ProxySessionFixture) {
BOOST_CHECK_THROW(session.close(), InternalErrorException);
}
-BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, SessionFixture) {
+BOOST_FIXTURE_TEST_CASE(NoSuchQueueTest, ProxySessionFixture) {
BOOST_CHECK_THROW(subs.subscribe(lq, "no such queue").sync(), NotFoundException);
}
diff --git a/qpid/cpp/src/tests/serialize.cpp b/qpid/cpp/src/tests/serialize.cpp
index 72a92ee226..a120be6458 100644
--- a/qpid/cpp/src/tests/serialize.cpp
+++ b/qpid/cpp/src/tests/serialize.cpp
@@ -79,7 +79,7 @@ template <class A, class B, class C, class D> struct concat4 { typedef typename
typedef mpl::vector<Bit, Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes;
typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes;
typedef mpl::vector<Double, Float>::type FloatTypes;
-typedef mpl::vector<SequenceNo, Uuid, DateTime, Dec32, Dec64> FixedSizeClassTypes;
+typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes;
typedef mpl::vector<Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes;
@@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(testNetworkByteOrder) {
void testValue(bool& b) { b = true; }
template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; }
void testValue(long long& l) { l = 12345; }
-void testValue(DateTime& dt) { dt = qpid::sys::now(); }
+void testValue(Datetime& dt) { dt = qpid::sys::now(); }
void testValue(Uuid& uuid) { uuid=Uuid(true); }
template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=1234; }
void testValue(SequenceNo& s) { s = 42; }
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index bbdb501b80..d1e3293a3e 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -20,7 +20,7 @@
-
-->
-<amqp major="0" minor="10" port="5672">
+<amqp major="99" minor="0" port="5672">
<class name = "cluster" index = "201">