diff options
author | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
commit | 9d199b74aee76859480a7ee92d95c6db42028b43 (patch) | |
tree | ca09aace4aaac2afa9650cc78833d30b056313a9 | |
parent | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff) | |
download | qpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz |
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
37 files changed, 1246 insertions, 847 deletions
diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp index 7c514b1010..e24e729e1d 100644 --- a/cpp/examples/qmf-agent/example.cpp +++ b/cpp/examples/qmf-agent/example.cpp @@ -26,6 +26,7 @@ #include "qmf/org/apache/qpid/agent/example/Parent.h" #include "qmf/org/apache/qpid/agent/example/Child.h" #include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h" +#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h" #include "qmf/org/apache/qpid/agent/example/Package.h" #include <unistd.h> @@ -129,7 +130,7 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, children.push_back(child); - mgmtObject->event_childCreated(ioArgs.i_name); + agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name)); return STATUS_OK; } diff --git a/cpp/examples/qmf-agent/schema.xml b/cpp/examples/qmf-agent/schema.xml index 22ecabb8a5..1bf701a655 100644 --- a/cpp/examples/qmf-agent/schema.xml +++ b/cpp/examples/qmf-agent/schema.xml @@ -37,14 +37,6 @@ <arg name="name" dir="I" type="sstr"/> <arg name="childRef" dir="O" type="objId"/> </method> - - <event name="childCreated"> - <arg name="name" type="sstr"/> - </event> - - <event name="childDestroyed"> - <arg name="name" type="sstr"/> - </event> </class> @@ -59,7 +51,14 @@ <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> - <method name="delete"/> + <method name="delete"/> </class> + + <eventArguments> + <arg name="childName" type="sstr"/> + </eventArguments> + + <event name="ChildCreated" args="childName"/> + <event name="ChildDestroyed" args="childName"/> </schema> diff --git a/cpp/managementgen/qmf-gen b/cpp/managementgen/qmf-gen index 523579fe6c..62362e3cad 100755 --- a/cpp/managementgen/qmf-gen +++ b/cpp/managementgen/qmf-gen @@ -58,6 +58,8 @@ for schemafile in args: gen.makeClassFiles ("Class.h", package) gen.makeClassFiles ("Class.cpp", package) gen.makeMethodFiles ("Args.h", package) + gen.makeEventFiles ("Event.h", package) + gen.makeEventFiles ("Event.cpp", package) gen.makePackageFile ("Package.h", package) gen.makePackageFile ("Package.cpp", package) diff --git a/cpp/managementgen/qmf/generate.py b/cpp/managementgen/qmf/generate.py index 7346200a28..958728d739 100755 --- a/cpp/managementgen/qmf/generate.py +++ b/cpp/managementgen/qmf/generate.py @@ -250,6 +250,14 @@ class Generator: path = self.packagePath + _class.getNameCap () + extension return path + def targetEventFile (self, event, templateFile): + dot = templateFile.find(".") + if dot == -1: + raise ValueError ("Invalid template file name %s" % templateFile) + extension = templateFile[dot:len (templateFile)] + path = self.packagePath + "Event" + event.getNameCap () + extension + return path + def targetMethodFile (self, method, templateFile): """ Return the file name for a method file """ dot = templateFile.rfind(".") @@ -293,6 +301,16 @@ class Generator: stream = template.expand (_class) self.writeIfChanged (stream, target, force) + def makeEventFiles (self, templateFile, schema, force=False): + """ Generate an expanded template per schema event """ + events = schema.getEvents() + template = Template (self.input + templateFile, self) + self.templateFiles.append (templateFile) + for event in events: + target = self.targetEventFile(event, templateFile) + stream = template.expand(event) + self.writeIfChanged(stream, target, force) + def makeMethodFiles (self, templateFile, schema, force=False): """ Generate an expanded template per method-with-arguments """ classes = schema.getClasses () diff --git a/cpp/managementgen/qmf/management-types.xml b/cpp/managementgen/qmf/management-types.xml index 6e34421d99..626880afb3 100644 --- a/cpp/managementgen/qmf/management-types.xml +++ b/cpp/managementgen/qmf/management-types.xml @@ -30,7 +30,7 @@ <type name="int64" base="S64" cpp="int64_t" encode="@.putInt64(#)" decode="# = @.getInt64()" accessor="direct" init="0"/> <type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet(#?1:0)" decode="# = @.getOctet()==1" accessor="direct" init="0"/> <type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" accessor="direct" init='""' byRef="y"/> -<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString(#)" decode="@.getLongString(#)" accessor="direct" init='""' byRef="y"/> +<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" accessor="direct" init='""' byRef="y"/> <type name="absTime" base="ABSTIME" cpp="int64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/> <type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/> <type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" accessor="direct" init="0."/> diff --git a/cpp/managementgen/qmf/schema.py b/cpp/managementgen/qmf/schema.py index 2ecf9c351f..48e697ab4a 100755 --- a/cpp/managementgen/qmf/schema.py +++ b/cpp/managementgen/qmf/schema.py @@ -21,6 +21,32 @@ from xml.dom.minidom import parse, parseString, Node from cStringIO import StringIO import md5 +class Hash: + """ Manage the hash of an XML sub-tree """ + def __init__(self, node): + self.md5Sum = md5.new() + self._compute(node) + + def addSubHash(self, hash): + """ Use this method to add the hash of a dependend-on XML fragment that is not in the sub-tree """ + self.md5Sum.update(hash.getDigest()) + + def getDigest(self): + return self.md5Sum.digest() + + def _compute(self, node): + attrs = node.attributes + self.md5Sum.update(node.nodeName) + + for idx in range(attrs.length): + self.md5Sum.update(attrs.item(idx).nodeName) + self.md5Sum.update(attrs.item(idx).nodeValue) + + for child in node.childNodes: + if child.nodeType == Node.ELEMENT_NODE: + self._compute(child) + + #===================================================================================== # #===================================================================================== @@ -525,6 +551,7 @@ class SchemaArg: self.maxLen = None self.desc = None self.default = None + self.hash = Hash(node) attrs = node.attributes for idx in range (attrs.length): @@ -675,12 +702,12 @@ class SchemaMethod: # #===================================================================================== class SchemaEvent: - def __init__ (self, parent, node, typespec): - self.parent = parent - self.name = None - self.desc = None - self.args = [] - self.defaultSeverity = None + def __init__ (self, package, node, typespec, argset): + self.packageName = package + self.name = None + self.desc = None + self.args = [] + self.hash = Hash(node) attrs = node.attributes for idx in range (attrs.length): @@ -692,72 +719,96 @@ class SchemaEvent: elif key == 'desc': self.desc = val - elif key == 'defaultSeverity': - self.defaultSeverity = val + elif key == 'args': + list = val.replace(" ", "").split(",") + for item in list: + if item not in argset.args: + raise Exception("undefined argument '%s' in event" % item) + self.args.append(argset.args[item]) + self.hash.addSubHash(argset.args[item].hash) else: raise ValueError ("Unknown attribute in event '%s'" % key) - for child in node.childNodes: - if child.nodeType == Node.ELEMENT_NODE: - if child.nodeName == 'arg': - arg = SchemaArg (child, typespec) - self.args.append (arg) - else: - raise ValueError ("Unknown event tag '%s'" % child.nodeName) - def getName (self): return self.name + def getNameCap(self): + return capitalize(self.name) + def getFullName (self): - return capitalize(self.parent.getName()) + capitalize(self.name) + return capitalize(self.package + capitalize(self.name)) def getArgCount (self): return len (self.args) - def genMethodBody (self, stream, variables, classObject): - stream.write("void ") - classObject.genNameCap(stream, variables) - stream.write("::event_%s(" % self.name) - count = 0 + def genArgCount (self, stream, variables): + stream.write("%d" % len(self.args)) + + def genArgDeclarations(self, stream, variables): for arg in self.args: - arg.genFormalParam(stream, variables) - count += 1 - if count < len(self.args): - stream.write(", ") - stream.write(") {\n") - stream.write(" ::qpid::sys::Mutex::ScopedLock mutex(getMutex());\n") - stream.write(" Buffer* buf = startEventLH();\n") - stream.write(" objectId.encode(*buf);\n") - stream.write(" buf->putShortString(packageName);\n") - stream.write(" buf->putShortString(className);\n") - stream.write(" buf->putBin128(md5Sum);\n") - stream.write(" buf->putShortString(\"%s\");\n" % self.name) + if arg.type.type.byRef: + ref = "&" + else: + ref = "" + stream.write(" const %s%s %s;\n" % (arg.type.type.cpp, ref, arg.name)) + + def genCloseNamespaces (self, stream, variables): + for item in self.packageName.split("."): + stream.write ("}") + + def genConstructorArgs(self, stream, variables): + pre = "" for arg in self.args: - stream.write(" %s;\n" % arg.type.type.encode.replace("@", "(*buf)").replace("#", "_" + arg.name)) - stream.write(" finishEventLH(buf);\n") - stream.write("}\n\n") + if arg.type.type.byRef: + ref = "&" + else: + ref = "" + stream.write("%sconst %s%s _%s" % (pre, arg.type.type.cpp, ref, arg.name)) + pre = ",\n " - def genMethodDecl (self, stream, variables): - stream.write(" void event_%s(" % self.name) - count = 0 + def genConstructorInits(self, stream, variables): + pre = "" for arg in self.args: - arg.genFormalParam(stream, variables) - count += 1 - if count < len(self.args): - stream.write(", ") - stream.write(");\n") + stream.write("%s%s(_%s)" % (pre, arg.name, arg.name)) + pre = ",\n " - def genSchema(self, stream, variables): - stream.write (" ft = FieldTable ();\n") - stream.write (" ft.setString (NAME, \"" + self.name + "\");\n") - stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n") - if self.desc != None: - stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n") - stream.write (" buf.put (ft);\n\n") + def genName(self, stream, variables): + stream.write(self.name) + + def genNameCap(self, stream, variables): + stream.write(capitalize(self.name)) + + def genNamespace (self, stream, variables): + stream.write("::".join(self.packageName.split("."))) + + def genNameLower(self, stream, variables): + stream.write(self.name.lower()) + + def genNameUpper(self, stream, variables): + stream.write(self.name.upper()) + + def genNamePackageLower(self, stream, variables): + stream.write(self.packageName.lower()) + + def genOpenNamespaces (self, stream, variables): + for item in self.packageName.split("."): + stream.write ("namespace %s {\n" % item) + + def genArgEncodes(self, stream, variables): + for arg in self.args: + stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n") + + def genArgSchema(self, stream, variables): for arg in self.args: - arg.genSchema (stream, True) + arg.genSchema(stream, True) + def genSchemaMD5(self, stream, variables): + sum = self.hash.getDigest() + for idx in range (len (sum)): + if idx != 0: + stream.write (",") + stream.write (hex (ord (sum[idx]))) class SchemaClass: @@ -768,9 +819,7 @@ class SchemaClass: self.methods = [] self.events = [] self.options = options - self.md5Sum = md5.new () - - self.hash (node) + self.hash = Hash(node) attrs = node.attributes self.name = makeValidCppSymbol(attrs['name'].nodeValue) @@ -790,10 +839,6 @@ class SchemaClass: sub = SchemaMethod (self, child, typespec) self.methods.append (sub) - elif child.nodeName == 'event': - sub = SchemaEvent (self, child, typespec) - self.events.append (sub) - elif child.nodeName == 'group': self.expandFragment (child, fragments) @@ -820,24 +865,12 @@ class SchemaClass: result = result[0:pos] + "threadStats->" + result[pos:] start = pos + 9 + len(next[1]) - def hash (self, node): - attrs = node.attributes - self.md5Sum.update (node.nodeName) - - for idx in range (attrs.length): - self.md5Sum.update (attrs.item(idx).nodeName) - self.md5Sum.update (attrs.item(idx).nodeValue) - - for child in node.childNodes: - if child.nodeType == Node.ELEMENT_NODE: - self.hash (child) - def expandFragment (self, node, fragments): attrs = node.attributes name = attrs['name'].nodeValue for fragment in fragments: if fragment.name == name: - self.md5Sum.update (fragment.md5Sum.digest()) + self.hash.addSubHash(fragment.hash) for config in fragment.properties: self.properties.append (config) for inst in fragment.statistics: @@ -937,27 +970,12 @@ class SchemaClass: inArgCount = inArgCount + 1 if methodCount == 0: - stream.write ("string, Buffer&, Buffer& outBuf") + stream.write ("string&, Buffer&, Buffer& outBuf") else: if inArgCount == 0: - stream.write ("string methodName, Buffer&, Buffer& outBuf") + stream.write ("string& methodName, Buffer&, Buffer& outBuf") else: - stream.write ("string methodName, Buffer& inBuf, Buffer& outBuf") - - def genEventCount (self, stream, variables): - stream.write ("%d" % len (self.events)) - - def genEventMethodBodies (self, stream, variables): - for event in self.events: - event.genMethodBody (stream, variables, self) - - def genEventMethodDecls (self, stream, variables): - for event in self.events: - event.genMethodDecl (stream, variables) - - def genEventSchema (self, stream, variables): - for event in self.events: - event.genSchema (stream, variables) + stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf") def genHiLoStatResets (self, stream, variables): for inst in self.statistics: @@ -1124,7 +1142,7 @@ class SchemaClass: return def genSchemaMD5 (self, stream, variables): - sum = self.md5Sum.digest () + sum = self.hash.getDigest() for idx in range (len (sum)): if idx != 0: stream.write (",") @@ -1149,6 +1167,20 @@ class SchemaClass: stat.genWrite (stream) +class SchemaEventArgs: + def __init__(self, package, node, typespec, fragments, options): + self.packageName = package + self.options = options + self.args = {} + + children = node.childNodes + for child in children: + if child.nodeType == Node.ELEMENT_NODE: + if child.nodeName == 'arg': + arg = SchemaArg(child, typespec) + self.args[arg.name] = arg + else: + raise Exception("Unknown tag '%s' in <eventArguments>" % child.nodeName) class SchemaPackage: def __init__ (self, typefile, schemafile, options): @@ -1156,6 +1188,8 @@ class SchemaPackage: self.classes = [] self.fragments = [] self.typespec = TypeSpec (typefile) + self.eventArgSet = None + self.events = [] dom = parse (schemafile) document = dom.documentElement @@ -1179,6 +1213,15 @@ class SchemaPackage: self.fragments, options) self.fragments.append (cls) + elif child.nodeName == 'eventArguments': + if self.eventArgSet: + raise Exception("Only one <eventArguments> may appear in a package") + self.eventArgSet = SchemaEventArgs(self.packageName, child, self.typespec, self.fragments, options) + + elif child.nodeName == 'event': + event = SchemaEvent(self.packageName, child, self.typespec, self.eventArgSet) + self.events.append(event) + else: raise ValueError ("Unknown schema tag '%s'" % child.nodeName) @@ -1194,6 +1237,9 @@ class SchemaPackage: def getClasses (self): return self.classes + def getEvents(self): + return self.events + def genCloseNamespaces (self, stream, variables): for item in self.packageName.split("."): stream.write ("}") @@ -1217,12 +1263,20 @@ class SchemaPackage: stream.write ("#include \"") _class.genNameCap (stream, variables) stream.write (".h\"\n") + for _event in self.events: + stream.write ("#include \"Event") + _event.genNameCap(stream, variables) + stream.write (".h\"\n") - def genClassRegisters (self, stream, variables): + def genClassRegisters(self, stream, variables): for _class in self.classes: - stream.write (" ") - _class.genNameCap (stream, variables) - stream.write ("::registerClass(agent);\n") + stream.write(" ") + _class.genNameCap(stream, variables) + stream.write("::registerSelf(agent);\n") + for _event in self.events: + stream.write(" Event") + _event.genNameCap(stream, variables) + stream.write("::registerSelf(agent);\n") #===================================================================================== diff --git a/cpp/managementgen/qmf/templates/Class.cpp b/cpp/managementgen/qmf/templates/Class.cpp index 964e6f8349..0a69939821 100644 --- a/cpp/managementgen/qmf/templates/Class.cpp +++ b/cpp/managementgen/qmf/templates/Class.cpp @@ -85,9 +85,9 @@ namespace { const string DEFAULT("default"); } -void /*MGEN:Class.NameCap*/::registerClass(ManagementAgent* agent) +void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent) { - agent->RegisterClass(packageName, className, md5Sum, writeSchema); + agent->registerClass(packageName, className, md5Sum, writeSchema); } void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) @@ -95,13 +95,13 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) FieldTable ft; // Schema class header: + buf.putOctet (CLASS_KIND_TABLE); buf.putShortString (packageName); // Package Name buf.putShortString (className); // Class Name buf.putBin128 (md5Sum); // Schema Hash buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count - buf.putShort (/*MGEN:Class.EventCount*/); // Event Count // Properties /*MGEN:Class.PropertySchema*/ @@ -109,8 +109,6 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) /*MGEN:Class.StatisticSchema*/ // Methods /*MGEN:Class.MethodSchema*/ - // Events -/*MGEN:Class.EventSchema*/ } /*MGEN:IF(Class.ExistPerThreadStats)*/ @@ -176,9 +174,8 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; + /*MGEN:Class.MethodHandlers*/ outBuf.putLong(status); outBuf.putShortString(Manageable::StatusText(status, text)); } - -/*MGEN:Class.EventMethodBodies*/ diff --git a/cpp/managementgen/qmf/templates/Class.h b/cpp/managementgen/qmf/templates/Class.h index 99ebc68789..2a995c95a5 100644 --- a/cpp/managementgen/qmf/templates/Class.h +++ b/cpp/managementgen/qmf/templates/Class.h @@ -72,7 +72,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject void writeProperties (::qpid::framing::Buffer& buf); void writeStatistics (::qpid::framing::Buffer& buf, bool skipHeaders = false); - void doMethod (std::string methodName, + void doMethod (std::string& methodName, ::qpid::framing::Buffer& inBuf, ::qpid::framing::Buffer& outBuf); writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; } @@ -88,17 +88,15 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject /*MGEN:Class.SetGeneralReferenceDeclaration*/ - static void registerClass (::qpid::management::ManagementAgent* agent); - std::string& getPackageName (void) { return packageName; } - std::string& getClassName (void) { return className; } - uint8_t* getMd5Sum (void) { return md5Sum; } + static void registerSelf (::qpid::management::ManagementAgent* agent); + std::string& getPackageName (void) const { return packageName; } + std::string& getClassName (void) const { return className; } + uint8_t* getMd5Sum (void) const { return md5Sum; } // Method IDs /*MGEN:Class.MethodIdDeclarations*/ // Accessor Methods /*MGEN:Class.AccessorMethods*/ - // Event Methods -/*MGEN:Class.EventMethodDecls*/ }; }/*MGEN:Class.CloseNamespaces*/ diff --git a/cpp/managementgen/qmf/templates/Event.cpp b/cpp/managementgen/qmf/templates/Event.cpp new file mode 100644 index 0000000000..cdb40c6d79 --- /dev/null +++ b/cpp/managementgen/qmf/templates/Event.cpp @@ -0,0 +1,77 @@ +/*MGEN:commentPrefix=//*/ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +/*MGEN:Root.Disclaimer*/ + +#include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/management/Manageable.h" +#include "qpid/agent/ManagementAgent.h" +#include "Event/*MGEN:Event.NameCap*/.h" + +using namespace qmf::/*MGEN:Event.Namespace*/; +using namespace qpid::framing; +using qpid::management::ManagementAgent; +using qpid::management::Manageable; +using qpid::management::ManagementObject; +using qpid::management::Args; +using std::string; + +string Event/*MGEN:Event.NameCap*/::packageName = string ("/*MGEN:Event.NamePackageLower*/"); +string Event/*MGEN:Event.NameCap*/::eventName = string ("/*MGEN:Event.Name*/"); +uint8_t Event/*MGEN:Event.NameCap*/::md5Sum[16] = + {/*MGEN:Event.SchemaMD5*/}; + +Event/*MGEN:Event.NameCap*/::Event/*MGEN:Event.NameCap*/ (/*MGEN:Event.ConstructorArgs*/) : + /*MGEN:Event.ConstructorInits*/ +{} + +namespace { + const string NAME("name"); + const string TYPE("type"); + const string DESC("desc"); + const string ARGCOUNT("argCount"); + const string ARGS("args"); +} + +void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent) +{ + agent->registerEvent(packageName, eventName, md5Sum, writeSchema); +} + +void Event/*MGEN:Event.NameCap*/::writeSchema (Buffer& buf) +{ + FieldTable ft; + + // Schema class header: + buf.putOctet (CLASS_KIND_EVENT); + buf.putShortString (packageName); // Package Name + buf.putShortString (eventName); // Event Name + buf.putBin128 (md5Sum); // Schema Hash + buf.putShort (/*MGEN:Event.ArgCount*/); // Argument Count + + // Arguments +/*MGEN:Event.ArgSchema*/ +} + +void Event/*MGEN:Event.NameCap*/::encode(::qpid::framing::Buffer& buf) const +{ +/*MGEN:Event.ArgEncodes*/ +} diff --git a/cpp/managementgen/qmf/templates/Event.h b/cpp/managementgen/qmf/templates/Event.h new file mode 100644 index 0000000000..a943c0c501 --- /dev/null +++ b/cpp/managementgen/qmf/templates/Event.h @@ -0,0 +1,58 @@ +/*MGEN:commentPrefix=//*/ +#ifndef _MANAGEMENT_/*MGEN:Event.NameUpper*/_ +#define _MANAGEMENT_/*MGEN:Event.NameUpper*/_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +/*MGEN:Root.Disclaimer*/ + +#include "qpid/management/ManagementEvent.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/Uuid.h" + +namespace qmf { +/*MGEN:Event.OpenNamespaces*/ + +class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent +{ + private: + static void writeSchema (::qpid::framing::Buffer& buf); + static std::string packageName; + static std::string eventName; + static uint8_t md5Sum[16]; + +/*MGEN:Event.ArgDeclarations*/ + + public: + writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; } + + Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/); + ~Event/*MGEN:Class.NameCap*/() {}; + + static void registerSelf(::qpid::management::ManagementAgent* agent); + std::string& getPackageName() const { return packageName; } + std::string& getEventName() const { return eventName; } + uint8_t* getMd5Sum() const { return md5Sum; } + void encode(::qpid::framing::Buffer& buffer) const; +}; + +}/*MGEN:Event.CloseNamespaces*/ + +#endif /*!_MANAGEMENT_/*MGEN:Event.NameUpper*/_*/ diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp index 0a793c88e0..bc932d836c 100644 --- a/cpp/src/qpid/acl/Acl.cpp +++ b/cpp/src/qpid/acl/Acl.cpp @@ -25,6 +25,10 @@ #include "qpid/shared_ptr.h" #include "qpid/log/Logger.h" #include "qmf/org/apache/qpid/acl/Package.h" +#include "qmf/org/apache/qpid/acl/EventAllow.h" +#include "qmf/org/apache/qpid/acl/EventDeny.h" +#include "qmf/org/apache/qpid/acl/EventFileLoaded.h" +#include "qmf/org/apache/qpid/acl/EventFileLoadFailed.h" #include <map> @@ -41,7 +45,7 @@ namespace _qmf = qmf::org::apache::qpid::acl; Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transferAcl(false) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + agent = ManagementAgent::Singleton::getInstance(); if (agent != 0){ _qmf::Package packageInit(agent); @@ -86,7 +90,11 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer switch (aclreslt) { case ALLOWLOG: - QPID_LOG(info, "ACL Allow id:" << id <<" action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name ); + QPID_LOG(info, "ACL Allow id:" << id <<" action:" << AclHelper::getActionStr(action) << + " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name ); + agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action), + AclHelper::getObjectTypeStr(objType), + name, framing::FieldTable())); case ALLOW: return true; case DENY: @@ -94,13 +102,12 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer return false; case DENYLOG: if (mgmtObject!=0) mgmtObject->inc_aclDenyCount(); - default: - QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name); - if (mgmtObject!=0){ - framing::FieldTable _params; - mgmtObject->event_aclEvent(1, id, AclHelper::getActionStr(action),AclHelper::getObjectTypeStr(objType),name, _params); - } - return false; + default: + QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name); + agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action), + AclHelper::getObjectTypeStr(objType), + name, framing::FieldTable())); + return false; } return false; } @@ -115,7 +122,7 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer boost::shared_ptr<AclData> d(new AclData); AclReader ar; if (ar.read(aclFile, d)){ - mgmtObject->event_fileNotLoaded("","See log for file load reason failure"); + agent->raiseEvent(_qmf::EventFileLoadFailed("", "See log for file load reason failure")); return false; } @@ -127,7 +134,7 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer sys::AbsTime now = sys::AbsTime::now(); int64_t ns = sys::Duration(now); mgmtObject->set_lastAclLoad(ns); - mgmtObject->event_fileLoaded(""); + agent->raiseEvent(_qmf::EventFileLoaded("")); } return true; } diff --git a/cpp/src/qpid/acl/Acl.h b/cpp/src/qpid/acl/Acl.h index fe1c1500bb..8a3825f683 100644 --- a/cpp/src/qpid/acl/Acl.h +++ b/cpp/src/qpid/acl/Acl.h @@ -27,6 +27,7 @@ #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/management/Manageable.h" +#include "qpid/agent/ManagementAgent.h" #include "qmf/org/apache/qpid/acl/Acl.h" #include <map> @@ -57,7 +58,7 @@ private: bool transferAcl; boost::shared_ptr<AclData> data; qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle - + qpid::management::ManagementAgent* agent; public: Acl (AclValues& av, broker::Broker& b); diff --git a/cpp/src/qpid/acl/management-schema.xml b/cpp/src/qpid/acl/management-schema.xml index 7d20353755..f362561356 100644 --- a/cpp/src/qpid/acl/management-schema.xml +++ b/cpp/src/qpid/acl/management-schema.xml @@ -17,32 +17,28 @@ --> <class name="acl"> - <property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/> - <property name="policyFile" type="sstr" access="RO" desc="Name of the policy file"/> - <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> - <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> - <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> - <statistic name="aclDenyCount" type="count64" unit="record" desc="Number of ACL requests denied"/> + <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/> + <property name="policyFile" type="sstr" access="RO" desc="Name of the policy file"/> + <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/> + <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/> + <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/> + <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/> <method name="reloadACLFile" desc="Reload the ACL file"/> + </class> - <event name="aclEvent" defaultSeverity="info" desc="Event generated by the ACL policy"> - <arg name="denied" type="bool"/> - <arg name="authId" type="sstr"/> - <arg name="action" type="sstr"/> - <arg name="objType" type="sstr"/> - <arg name="name" type="sstr"/> - <arg name="params" type="map"/> - </event> - - <event name="fileLoaded" defaultSeverity="warning" desc="ACL file successfully loaded - New policy in effect"> - <arg name="authId" type="sstr" desc="Name of user who initiated the file load"/> - </event> + <eventArguments> + <arg name="action" type="sstr"/> + <arg name="arguments" type="map"/> + <arg name="objectName" type="sstr"/> + <arg name="objectType" type="sstr"/> + <arg name="reason" type="sstr"/> + <arg name="userId" type="sstr"/> + </eventArguments> - <event name="fileNotLoaded" defaultSeverity="error" desc="Replacement ACL file could not be loaded"> - <arg name="authId" type="sstr" desc="Name of user who initiated the file load"/> - <arg name="reason" type="sstr" desc="Reason for failure"/> - </event> - </class> + <event name="allow" args="userId, action, objectType, objectName, arguments"/> + <event name="deny" args="userId, action, objectType, objectName, arguments"/> + <event name="fileLoaded" args="userId"/> + <event name="fileLoadFailed" args="userId, reason"/> </schema> diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index 1c219f7463..6af9abc26b 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -21,6 +21,7 @@ // #include "qpid/management/ManagementObject.h" +#include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Mutex.h" @@ -43,8 +44,8 @@ class ManagementAgent static ManagementAgent* agent; }; - ManagementAgent () {} - virtual ~ManagementAgent () {} + ManagementAgent() {} + virtual ~ManagementAgent() {} virtual int getMaxThreads() = 0; @@ -78,10 +79,16 @@ class ManagementAgent // package initializer generated by the management code generator. // virtual void - RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) = 0; + registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) = 0; + + virtual void + registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, + management::ManagementEvent::writeSchemaCall_t schemaCall) = 0; // Add a management object to the agent. Once added, this object shall be visible // in the greater management context. @@ -97,8 +104,11 @@ class ManagementAgent // pointer. This allows the management agent to report the deletion of the object // in an orderly way. // - virtual ObjectId addObject (ManagementObject* objectPtr, - uint64_t persistId = 0) = 0; + virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0; + + // + // + virtual void raiseEvent(const ManagementEvent& event) = 0; // If "useExternalThread" was set to true in init, this method must // be called to provide a thread for any pending method calls that have arrived. @@ -113,7 +123,7 @@ class ManagementAgent // to pollCallbacks are necessary to clear the backlog. If callLimit is zero, // the return value will also be zero. // - virtual uint32_t pollCallbacks (uint32_t callLimit = 0) = 0; + virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0; // If "useExternalThread" was set to true in the constructor, this method provides // a standard file descriptor that can be used in a select statement to signal that @@ -121,14 +131,7 @@ class ManagementAgent // least one method call). When this fd is ready-for-read, pollCallbacks may be // invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd. // - virtual int getSignalFd (void) = 0; - -protected: - friend class ManagementObject; - virtual sys::Mutex& getMutex() = 0; - virtual framing::Buffer* startEventLH() = 0; - virtual void finishEventLH(framing::Buffer* buf) = 0; - + virtual int getSignalFd() = 0; }; }} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 6a1542b1f2..b178d0fc28 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -75,12 +75,13 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } -const string ManagementAgentImpl::storeMagicNumber("MA01"); +const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : extThread(false), writeFd(-1), readFd(-1), - clientWasAdded(true), requestedBank(0), - assignedBank(0), brokerBank(0), bootSequence(0), + connected(false), lastFailure("never connected"), + clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), pubThreadBody(*this), pubThread(pubThreadBody) { @@ -122,18 +123,24 @@ void ManagementAgentImpl::init(string brokerHost, storeData(true); } -ManagementAgentImpl::~ManagementAgentImpl() -{ +void ManagementAgentImpl::registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementAgentImpl::RegisterClass(std::string packageName, - std::string className, - uint8_t* md5Sum, +void ManagementAgentImpl::registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage(packageName); - AddClassLocal(pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } ObjectId ManagementAgentImpl::addObject(ManagementObject* object, @@ -151,6 +158,23 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object, return objectId; } +void ManagementAgentImpl::raiseEvent(const ManagementEvent& event) +{ + Mutex::ScopedLock lock(agentLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.event"); +} + uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) { Mutex::ScopedLock lock(agentLock); @@ -184,10 +208,12 @@ void ManagementAgentImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - EncodeHeader(buffer, 'A'); + connected = true; + encodeHeader(buffer, 'A'); buffer.putShortString("RemoteAgent [C++]"); systemId.encode (buffer); - buffer.putLong(requestedBank); + buffer.putLong(requestedBrokerBank); + buffer.putLong(requestedAgentBank); uint32_t length = 512 - buffer.available(); buffer.reset(); connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); @@ -197,10 +223,12 @@ void ManagementAgentImpl::storeData(bool requested) { if (!storeFile.empty()) { ofstream outFile(storeFile.c_str()); - uint32_t bankToWrite = requested ? requestedBank : assignedBank; + uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank; + uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank; if (outFile.good()) { - outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl; + outFile << storeMagicNumber << " " << brokerBankToWrite << " " << + agentBankToWrite << " " << bootSequence << endl; outFile.close(); } } @@ -215,7 +243,8 @@ void ManagementAgentImpl::retrieveData() if (inFile.good()) { inFile >> mn; if (mn == storeMagicNumber) { - inFile >> requestedBank; + inFile >> requestedBrokerBank; + inFile >> requestedAgentBank; inFile >> bootSequence; } inFile.close(); @@ -229,7 +258,7 @@ void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequen Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'z', sequence); + encodeHeader(outBuffer, 'z', sequence); outBuffer.putLong(code); outBuffer.putShortString(text); outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -241,20 +270,23 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - brokerBank = inBuffer.getLong(); - assignedBank = inBuffer.getLong(); - if (assignedBank != requestedBank) { - if (requestedBank == 0) - cout << "Initial object-id bank assigned: " << assignedBank << endl; + assignedBrokerBank = inBuffer.getLong(); + assignedAgentBank = inBuffer.getLong(); + if ((assignedBrokerBank != requestedBrokerBank) || + (assignedAgentBank != requestedAgentBank)) { + if (requestedAgentBank == 0) + cout << "Initial object-id bank assigned: " << assignedBrokerBank << "." << + assignedAgentBank << endl; else - cout << "Collision in object-id! New bank assigned: " << assignedBank << endl; + cout << "Collision in object-id! New bank assigned: " << assignedBrokerBank << + "." << assignedAgentBank << endl; storeData(); } - attachment.setBanks(brokerBank, assignedBank); + attachment.setBanks(assignedBrokerBank, assignedAgentBank); // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBank); + connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -263,8 +295,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'p'); - EncodePackageIndication(outBuffer, pIter); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -273,8 +305,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) ClassMap cMap = pIter->second; for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { outBuffer.reset(); - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -294,14 +326,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { - ClassMap cMap = pIter->second; + ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - SchemaClass schema = cIter->second; + SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 's', sequence); + encodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); @@ -331,7 +363,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { @@ -344,7 +376,14 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); } else - iter->second->doMethod(methodName, inBuffer, outBuffer); + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(std::exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putShortString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -379,7 +418,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'g', sequence); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -419,7 +458,7 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader(inBuffer, &opcode, &sequence)) + if (checkHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); @@ -429,16 +468,16 @@ void ManagementAgentImpl::received(Message& msg) } } -void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); buf.putOctet('M'); - buf.putOctet('1'); + buf.putOctet('2'); buf.putOctet(opcode); buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; @@ -450,10 +489,10 @@ bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *se *opcode = buf.getOctet(); *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name) { PackageMap::iterator pIter = packages.find(name); if (pIter != packages.end()) @@ -467,8 +506,8 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage( Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'p'); - EncodePackageIndication(outBuffer, result.first); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package"); @@ -486,8 +525,9 @@ void ManagementAgentImpl::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, - string className, +void ManagementAgentImpl::addClassLocal(uint8_t classKind, + PackageMap::iterator pIter, + const string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { @@ -502,32 +542,28 @@ void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, return; // No such class found, create a new class with local information. - SchemaClass classInfo; - - classInfo.writeSchemaCall = schemaCall; - cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cMap.insert(std::pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); } -void ManagementAgentImpl::EncodePackageIndication(Buffer& buf, +void ManagementAgentImpl::encodePackageIndication(Buffer& buf, PackageMap::iterator pIter) { buf.putShortString((*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication(Buffer& buf, +void ManagementAgentImpl::encodeClassIndication(Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; + buf.putOctet((*cIter).second.kind); buf.putShortString((*pIter).first); buf.putShortString(key.name); - buf.putBin128 (key.hash); + buf.putBin128(key.hash); } -void ManagementAgentImpl::PeriodicProcessing() +void ManagementAgentImpl::periodicProcessing() { #define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); @@ -536,9 +572,12 @@ void ManagementAgentImpl::PeriodicProcessing() string routingKey; std::list<ObjectId> deleteList; + if (!connected) + return; + { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); + encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available(); @@ -573,7 +612,7 @@ void ManagementAgentImpl::PeriodicProcessing() if (object->getConfigChanged() || object->isDeleted()) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'c'); + encodeHeader(msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available(); @@ -585,7 +624,7 @@ void ManagementAgentImpl::PeriodicProcessing() if (object->getInstChanged()) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'i'); + encodeHeader(msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available(); @@ -664,8 +703,8 @@ ManagementAgentImpl::ConnectionThread::~ConnectionThread() void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length, - string exchange, - string routingKey) + const string& exchange, + const string& routingKey) { { Mutex::ScopedLock _lock(connLock); @@ -683,10 +722,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, session.messageTransfer(arg::content=msg, arg::destination=exchange); } -void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; - key << "agent." << agentBank; + key << "agent." << brokerBank << "." << agentBank; session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), arg::bindingKey=key.str()); } @@ -695,28 +734,7 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) void ManagementAgentImpl::PublishThread::run() { while (true) { - ::sleep(5); - agent.PeriodicProcessing(); + ::sleep(agent.getInterval()); + agent.periodicProcessing(); } } - -Mutex& ManagementAgentImpl::getMutex() -{ - return agentLock; -} - -Buffer* ManagementAgentImpl::startEventLH() -{ - Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); - EncodeHeader(*outBuffer, 'e'); - outBuffer->putLongLong(uint64_t(Duration(now()))); - return outBuffer; -} - -void ManagementAgentImpl::finishEventLH(Buffer* outBuffer) -{ - uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); - outBuffer->reset(); - connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event"); - delete outBuffer; -} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 7d9be6daf9..a964694690 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -43,24 +43,34 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen public: ManagementAgentImpl(); - virtual ~ManagementAgentImpl(); + virtual ~ManagementAgentImpl() {}; + // + // Methods from ManagementAgent + // int getMaxThreads() { return 1; } void init(std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, bool useExternalThread = false, std::string storeFile = ""); - void RegisterClass(std::string packageName, - std::string className, - uint8_t* md5Sum, + bool isConnected() { return connected; } + std::string& getLastFailure() { return lastFailure; } + void registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (management::ManagementObject* objectPtr, - uint64_t persistId = 0); - uint32_t pollCallbacks (uint32_t callLimit = 0); - int getSignalFd (void); + void registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + void raiseEvent(const management::ManagementEvent& event); + uint32_t pollCallbacks(uint32_t callLimit = 0); + int getSignalFd(); - void PeriodicProcessing(); + uint16_t getInterval() { return interval; } + void periodicProcessing(); private: @@ -84,8 +94,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen struct SchemaClass { management::ManagementObject::writeSchemaCall_t writeSchemaCall; + uint8_t kind; - SchemaClass () : writeSchemaCall(0) {} + SchemaClass(const management::ManagementObject::writeSchemaCall_t call, + const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {} }; struct QueuedMethod { @@ -120,12 +132,16 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen framing::Uuid systemId; std::string host; uint16_t port; + bool connected; + std::string lastFailure; + + bool clientWasAdded; + uint32_t requestedBrokerBank; + uint32_t requestedAgentBank; + uint32_t assignedBrokerBank; + uint32_t assignedAgentBank; + uint16_t bootSequence; - bool clientWasAdded; - uint32_t requestedBank; - uint32_t assignedBank; - uint32_t brokerBank; - uint16_t bootSequence; # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; char eventBuffer[MA_BUFFER_SIZE]; @@ -148,9 +164,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen ~ConnectionThread(); void sendBuffer(qpid::framing::Buffer& buf, uint32_t length, - std::string exchange, - std::string routingKey); - void bindToBank(uint32_t agentBank); + const std::string& exchange, + const std::string& routingKey); + void bindToBank(uint32_t brokerBank, uint32_t agentBank); }; class PublishThread : public sys::Runnable @@ -171,19 +187,20 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void startProtocol(); void storeData(bool requested=false); void retrieveData(); - PackageMap::iterator FindOrAddPackage (std::string name); + PackageMap::iterator findOrAddPackage(const std::string& name); void moveNewObjectsLH(); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, + void addClassLocal (uint8_t classKind, + PackageMap::iterator pIter, + const std::string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (qpid::framing::Buffer& buf, + void encodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); - void EncodeClassIndication (qpid::framing::Buffer& buf, + void encodeClassIndication (framing::Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); @@ -194,9 +211,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* outBuffer); }; }} diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 38affad51f..3ed41a3f2d 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -72,6 +72,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (agent != 0) mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); agent->addObject(mgmtObject); + ConnectionState::setUrl(mgmtId); } } diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index aac31bbf96..c04bd46f72 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -63,6 +63,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable virtual void setUserId(const string& uid) { userId = uid; } const string& getUserId() const { return userId; } + void setUrl(const string& _url) { url = _url; } + const string& getUrl() const { return url; } + void setFederationLink(bool b) { federationLink = b; } bool isFederationLink() const { return federationLink; } @@ -85,6 +88,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint16_t heartbeat; uint64_t stagingThreshold; string userId; + string url; bool federationLink; }; diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 64b0b80446..9c23abeba9 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -26,6 +26,15 @@ #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/agent/ManagementAgent.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h" #include <boost/format.hpp> #include <boost/cast.hpp> #include <boost/bind.hpp> @@ -36,6 +45,8 @@ namespace broker { using namespace qpid; using namespace qpid::framing; using namespace qpid::framing::dtx; +using namespace qpid::management; +namespace _qmf = qmf::org::apache::qpid::broker; typedef std::vector<Queue::shared_ptr> QueueVector; @@ -54,18 +65,17 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const const string& alternateExchange, bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { + AclModule* acl = getBroker().getAcl(); + if (acl) { std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::TYPE, type)); - params.insert(make_pair(acl::ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") )); - params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false"))); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,¶ms) ) - throw NotAllowedException("ACL denied exhange declare request"); - } - + params.insert(make_pair(acl::TYPE, type)); + params.insert(make_pair(acl::ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") )); + params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false"))); + if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,¶ms) ) + throw NotAllowedException("ACL denied exhange declare request"); + } + //TODO: implement autoDelete Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { @@ -90,6 +100,13 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const checkType(response.first, type); checkAlternate(response.first, alternate); } + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, + alternateExchange, durable, false, args, + response.second ? "created" : "existing")); + }catch(UnknownExchangeTypeException& e){ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } @@ -106,38 +123,37 @@ void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchang void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { if (alternate && alternate != exchange->getAlternate()) - throw NotAllowedException( - QPID_MSG("Exchange declared with alternate-exchange " - << exchange->getAlternate()->getName() << ", requested " - << alternate->getName())); + throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange " + << exchange->getAlternate()->getName() << ", requested " + << alternate->getName())); } -void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange delete request"); +void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) ) + throw NotAllowedException("ACL denied exhange delete request"); } - //TODO: implement unused Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); -} + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); +} ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) { - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) ) - throw NotAllowedException("ACL denied exhange query request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) ) + throw NotAllowedException("ACL denied exhange query request"); } try { @@ -147,15 +163,15 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam return ExchangeQueryResult("", false, true, FieldTable()); } } + void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const string& exchangeName, const string& routingKey, - const FieldTable& arguments){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) ) - throw NotAllowedException("ACL denied exhange bind request"); + const FieldTable& arguments) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) ) + throw NotAllowedException("ACL denied exhange bind request"); } Queue::shared_ptr queue = getQueue(queueName); @@ -167,30 +183,29 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, if (exchange->isDurable() && queue->isDurable()) { getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); } }else{ - throw NotFoundException( - "Bind failed. No such exchange: " + exchangeName); + throw NotFoundException("Bind failed. No such exchange: " + exchangeName); } } -void -SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, - const string& exchangeName, - const string& routingKey) +void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, + const string& exchangeName, + const string& routingKey) { - - AclModule* acl = getBroker().getAcl(); - if (acl) - { + AclModule* acl = getBroker().getAcl(); + if (acl) { std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::QUEUENAME, queueName)); - params.insert(make_pair(acl::ROUTINGKEY, routingKey)); - if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exchange unbind request"); + params.insert(make_pair(acl::QUEUENAME, queueName)); + params.insert(make_pair(acl::ROUTINGKEY, routingKey)); + if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,¶ms) ) + throw NotAllowedException("ACL denied exchange unbind request"); } - Queue::shared_ptr queue = getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); @@ -198,10 +213,14 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); //TODO: revise unbind to rely solely on binding key (not args) - if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - } + if (exchange->unbind(queue, routingKey, 0)) { + if (exchange->isDurable() && queue->isDurable()) + getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); + } } ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, @@ -209,16 +228,15 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string const std::string& key, const framing::FieldTable& args) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { + AclModule* acl = getBroker().getAcl(); + if (acl) { std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::QUEUENAME, queueName)); - params.insert(make_pair(acl::ROUTINGKEY, key)); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,¶ms) ) - throw NotAllowedException("ACL denied exhange bound request"); + params.insert(make_pair(acl::QUEUENAME, queueName)); + params.insert(make_pair(acl::ROUTINGKEY, key)); + if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,¶ms) ) + throw NotAllowedException("ACL denied exhange bound request"); } - + Exchange::shared_ptr exchange; try { exchange = getBroker().getExchanges().get(exchangeName); @@ -279,13 +297,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) ) - throw NotAllowedException("ACL denied queue query request"); + AclModule* acl = getBroker().getAcl(); + if (acl) { + if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) ) + throw NotAllowedException("ACL denied queue query request"); } - + Queue::shared_ptr queue = session.getBroker().getQueues().find(name); if (queue) { @@ -305,20 +322,19 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) } void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, - bool autoDelete, const qpid::framing::FieldTable& arguments){ - - AclModule* acl = getBroker().getAcl(); - if (acl) - { + bool passive, bool durable, bool exclusive, + bool autoDelete, const qpid::framing::FieldTable& arguments) +{ + AclModule* acl = getBroker().getAcl(); + if (acl) { std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") )); - params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false"))); - params.insert(make_pair(acl::EXCLUSIVE, std::string(exclusive ? "true" : "false"))); - params.insert(make_pair(acl::AUTODELETE, std::string(autoDelete ? "true" : "false"))); - if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,¶ms) ) - throw NotAllowedException("ACL denied queue create request"); + params.insert(make_pair(acl::ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") )); + params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false"))); + params.insert(make_pair(acl::EXCLUSIVE, std::string(exclusive ? "true" : "false"))); + params.insert(make_pair(acl::AUTODELETE, std::string(autoDelete ? "true" : "false"))); + if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,¶ms) ) + throw NotAllowedException("ACL denied queue create request"); } Exchange::shared_ptr alternate; @@ -327,17 +343,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getQueue(name); + queue = getQueue(name); //TODO: check alternate-exchange is as expected } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().getQueues().declare( - name, durable, - autoDelete, - exclusive ? this : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue + std::pair<Queue::shared_ptr, bool> queue_created = + getBroker().getQueues().declare(name, durable, + autoDelete, + exclusive ? this : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -346,44 +361,50 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& //apply settings & create persistent record if required queue_created.first->create(arguments); - //add default binding: - getBroker().getExchanges().getDefault()->bind(queue, name, 0); + //add default binding: + getBroker().getExchanges().getDefault()->bind(queue, name, 0); queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); //handle automatic cleanup: - if (exclusive) { - exclusiveQueues.push_back(queue); - } - } else { + if (exclusive) { + exclusiveQueues.push_back(queue); + } + } else { if (exclusive && queue->setExclusiveOwner(this)) { - exclusiveQueues.push_back(queue); + exclusiveQueues.push_back(queue); } } + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), + name, durable, exclusive, autoDelete, arguments, + queue_created.second ? "created" : "existing")); } + if (exclusive && !queue->isExclusiveOwner(this)) - throw ResourceLockedException( - QPID_MSG("Cannot grant exclusive access to queue " - << queue->getName())); + throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue " + << queue->getName())); } void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue purge request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) ) + throw NotAllowedException("ACL denied queue purge request"); } getQueue(queue)->purge(); } void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ - AclModule* acl = getBroker().getAcl(); - if (acl) - { - if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) ) - throw NotAllowedException("ACL denied queue delete request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) ) + throw NotAllowedException("ACL denied queue delete request"); } ChannelException error(0, ""); @@ -401,6 +422,10 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse q->destroy(); getBroker().getQueues().destroy(queue); q->unbind(getBroker().getExchanges(), q); + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); } } @@ -441,12 +466,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, const FieldTable& arguments) { - AclModule* acl = getBroker().getAcl(); - if (acl) - { - // add flags as needed - if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) ) - throw NotAllowedException("ACL denied Queue subscribe request"); + AclModule* acl = getBroker().getAcl(); + if (acl) + { + // add flags as needed + if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) ) + throw NotAllowedException("ACL denied Queue subscribe request"); } Queue::shared_ptr queue = getQueue(queueName); @@ -457,12 +482,21 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), tag, queue, false, //TODO get rid of no-local acceptMode == 0, acquireMode == 0, exclusive, &arguments); + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), + queueName, destination, exclusive, arguments)); } void SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { state.cancel(destination); + + ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + if (agent) + agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); } void diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h index 4f482dc206..828e6e963a 100644 --- a/cpp/src/qpid/framing/Buffer.h +++ b/cpp/src/qpid/framing/Buffer.h @@ -28,7 +28,9 @@ namespace qpid { namespace framing { -struct OutOfBounds : qpid::Exception {}; +struct OutOfBounds : qpid::Exception { + OutOfBounds() : qpid::Exception(std::string("Out of Bounds")) {} +}; class Content; class FieldTable; diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index afa5c968f7..b4d80d8fad 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -45,6 +45,7 @@ class Manageable static const status_t STATUS_INVALID_PARAMETER = 4; static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; static const status_t STATUS_FORBIDDEN = 6; + static const status_t STATUS_EXCEPTION = 7; static const status_t STATUS_USER = 0x00010000; // Every "Manageable" object must hold a reference to exactly one diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 3ae98e8264..0e046bb813 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -179,14 +179,24 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang dExchange = _dexchange; } -void ManagementBroker::RegisterClass (string packageName, - string className, +void ManagementBroker::registerClass (string& packageName, + string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); - AddClass(pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementBroker::registerEvent (string& packageName, + string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } ObjectId ManagementBroker::addObject (ManagementObject* object, @@ -211,6 +221,23 @@ ObjectId ManagementBroker::addObject (ManagementObject* object, return objId; } +void ManagementBroker::raiseEvent(const ManagementEvent& event) +{ + Mutex::ScopedLock lock (userLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, "mgmt.event"); +} + ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} @@ -219,7 +246,7 @@ ManagementBroker::Periodic::~Periodic () {} void ManagementBroker::Periodic::fire () { broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); - broker.PeriodicProcessing (); + broker.periodicProcessing (); } void ManagementBroker::clientAdded (void) @@ -233,35 +260,35 @@ void ManagementBroker::clientAdded (void) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'x'); + encodeHeader (outBuffer, 'x'); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); } } -void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); - buf.putOctet ('1'); + buf.putOctet ('2'); buf.putOctet (opcode); buf.putLong (seq); } -bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::SendBuffer (Buffer& buf, +void ManagementBroker::sendBuffer (Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, string routingKey) @@ -304,7 +331,7 @@ void ManagementBroker::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementBroker::PeriodicProcessing (void) +void ManagementBroker::periodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); @@ -315,13 +342,13 @@ void ManagementBroker::PeriodicProcessing (void) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); + encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } moveNewObjectsLH(); @@ -350,25 +377,25 @@ void ManagementBroker::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + encodeHeader (msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + encodeHeader (msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted ()) @@ -393,12 +420,12 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'z', sequence); + encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } bool ManagementBroker::dispatchCommand (Deliverable& deliverable, @@ -411,7 +438,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.0.# + // agent.1.0.# // broker if (routingKey == "broker") { @@ -419,12 +446,12 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, return false; } - else if (routingKey.compare(0, 7, "agent.0") == 0) { + else if (routingKey.compare(0, 9, "agent.1.0") == 0) { dispatchAgentCommandLH(msg); return false; } - else if (routingKey.compare(0, 6, "agent.") == 0) { + else if (routingKey.compare(0, 8, "agent.1.") == 0) { return authorizeAgentMessageLH(msg); } @@ -447,7 +474,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); if (acl != 0) { string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); @@ -460,7 +487,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); return; } } @@ -476,12 +503,19 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); } else - iter->second->doMethod(methodName, inBuffer, outBuffer); + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(std::exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putShortString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -489,12 +523,12 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32 Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'b', sequence); + encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) @@ -506,11 +540,11 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p', sequence); - EncodePackageIndication (outBuffer, pIter); + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } sendCommandComplete (replyToKey, sequence); @@ -521,7 +555,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey std::string packageName; inBuffer.getShortString(packageName); - FindOrAddPackageLH(packageName); + findOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -542,11 +576,11 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q', sequence); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } @@ -558,26 +592,27 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui std::string packageName; SchemaClassKey key; + uint8_t kind = inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint32_t sequence = nextRequestSequence++; - EncodeHeader (outBuffer, 'S', sequence); + encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); outBuffer.putShortString(key.name); outBuffer.putBin128(key.hash); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); - pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); } } @@ -612,11 +647,11 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { - EncodeHeader(outBuffer, 's', sequence); + encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } else sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); @@ -634,9 +669,10 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo SchemaClassKey key; inBuffer.record(); - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); inBuffer.restore(); PackageMap::iterator pIter = packages.find(packageName); @@ -644,7 +680,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { - size_t length = ValidateSchema(inBuffer); + size_t length = validateSchema(inBuffer, cIter->second.kind); if (length == 0) { QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); @@ -658,11 +694,11 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -727,7 +763,7 @@ void ManagementBroker::deleteOrphanedAgentsLH() void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; - uint32_t requestedBank; + uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; @@ -737,14 +773,15 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } - inBuffer.getShortString (label); - systemId.decode (inBuffer); - requestedBank = inBuffer.getLong (); - assignedBank = assignBankLH (requestedBank); + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + assignedBank = assignBankLH(requestedAgentBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; @@ -755,7 +792,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); - agent->mgmtObject->set_objectIdBank (assignedBank); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[connectionRef] = agent; @@ -764,12 +802,12 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'a', sequence); + encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -799,12 +837,12 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'g', sequence); + encodeHeader (outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } } @@ -824,7 +862,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return false; if (opcode == 'M') { @@ -861,12 +899,12 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } return false; @@ -900,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -915,7 +953,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -930,19 +968,20 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } -void ManagementBroker::AddClass(PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -958,71 +997,76 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); cIter = cMap.find(key); } -void ManagementBroker::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementBroker::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementBroker::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementBroker::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); +} + +size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; } -size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; string text; uint8_t hash[16]; - inBuffer.record(); - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; - uint16_t propCount = inBuffer.getShort(); - uint16_t statCount = inBuffer.getShort(); - uint16_t methCount = inBuffer.getShort(); - uint16_t evntCount = inBuffer.getShort(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); - for (uint16_t idx = 0; idx < propCount + statCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); - for (uint16_t idx = 0; idx < methCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); } - } - for (uint16_t idx = 0; idx < evntCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } } + } catch (std::exception& e) { + return 0; } end = inBuffer.getPosition(); @@ -1030,24 +1074,34 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) return end - start; } -Mutex& ManagementBroker::getMutex() +size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) { - return userLock; -} + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; -Buffer* ManagementBroker::startEventLH() -{ - Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); - EncodeHeader(*outBuffer, 'e'); - outBuffer->putLongLong(uint64_t(Duration(now()))); - return outBuffer; -} + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; -void ManagementBroker::finishEventLH(Buffer* outBuffer) -{ - uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); - outBuffer->reset(); - SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); - delete outBuffer; -} + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t argCount = inBuffer.getShort(); + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (std::exception& e) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 23fba74b83..c0e0c50963 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -38,11 +38,11 @@ namespace management { class ManagementBroker : public ManagementAgent { - private: +private: int threadPoolSize; - public: +public: ManagementBroker (); virtual ~ManagementBroker (); @@ -52,13 +52,18 @@ class ManagementBroker : public ManagementAgent void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); int getMaxThreads () { return threadPoolSize; } - void RegisterClass (std::string packageName, - std::string className, + void registerClass (std::string& packageName, + std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void registerEvent (std::string& packageName, + std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); - void clientAdded (void); + void raiseEvent(const ManagementEvent& event); + void clientAdded (); bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -68,7 +73,7 @@ class ManagementBroker : public ManagementAgent uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } - private: +private: friend class ManagementAgent; struct Periodic : public qpid::broker::TimerTask @@ -127,15 +132,16 @@ class ManagementBroker : public ManagementAgent struct SchemaClass { + uint8_t kind; ManagementObject::writeSchemaCall_t writeSchemaCall; uint32_t pendingSequence; size_t bufferLen; uint8_t* buffer; - SchemaClass(uint32_t seq) : - writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} - SchemaClass(ManagementObject::writeSchemaCall_t call) : - writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind, uint32_t seq) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; @@ -154,12 +160,12 @@ class ManagementBroker : public ManagementAgent framing::Uuid uuid; sys::Mutex addLock; sys::Mutex userLock; - qpid::broker::Timer timer; + qpid::broker::Timer timer; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - qpid::broker::Broker* broker; + qpid::broker::Broker* broker; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; @@ -173,10 +179,10 @@ class ManagementBroker : public ManagementAgent char eventBuffer[MA_BUFFER_SIZE]; void writeData (); - void PeriodicProcessing (void); - void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (framing::Buffer& buf, + void periodicProcessing (void); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendBuffer (framing::Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); @@ -185,14 +191,15 @@ class ManagementBroker : public ManagementAgent bool authorizeAgentMessageLH(qpid::broker::Message& msg); void dispatchAgentCommandLH(qpid::broker::Message& msg); - PackageMap::iterator FindOrAddPackageLH(std::string name); - void AddClass(PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (framing::Buffer& buf, + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); - void EncodeClassIndication (framing::Buffer& buf, + void encodeClassIndication (framing::Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter); bool bankInUse (uint32_t bank); @@ -212,10 +219,9 @@ class ManagementBroker : public ManagementAgent void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - size_t ValidateSchema(framing::Buffer&); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* outBuffer); + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); }; }} diff --git a/cpp/src/qpid/management/ManagementEvent.h b/cpp/src/qpid/management/ManagementEvent.h new file mode 100644 index 0000000000..d5a47e9144 --- /dev/null +++ b/cpp/src/qpid/management/ManagementEvent.h @@ -0,0 +1,48 @@ +#ifndef _ManagementEvent_ +#define _ManagementEvent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include <qpid/framing/Buffer.h> +#include <string> + +namespace qpid { +namespace management { + +class ManagementAgent; + +class ManagementEvent : public ManagementItem { +public: + typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + virtual ~ManagementEvent() {} + + virtual writeSchemaCall_t getWriteSchemaCall(void) = 0; + virtual std::string& getEventName() const = 0; + virtual std::string& getPackageName() const = 0; + virtual uint8_t* getMd5Sum() const = 0; + virtual void encode(qpid::framing::Buffer&) const = 0; +}; + +}} + +#endif /*!_ManagementEvent_*/ diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index e0386ee057..ce65ae3279 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -84,6 +84,21 @@ void ObjectId::decode(framing::Buffer& buffer) second = buffer.getLongLong(); } +namespace qpid { +namespace management { + +std::ostream& operator<<(std::ostream& out, const ObjectId& i) +{ + out << "[" << ((i.first & 0xF000000000000000LL) >> 60) << + "-" << ((i.first & 0x0FFF000000000000LL) >> 48) << + "-" << ((i.first & 0x0000FFFFF0000000LL) >> 32) << + "-" << (i.first & 0x000000000FFFFFFFLL) << + "-" << i.second << "]"; + return out; +} + +}} + int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (Buffer& buf) @@ -109,18 +124,3 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } - -Mutex& ManagementObject::getMutex() -{ - return agent->getMutex(); -} - -Buffer* ManagementObject::startEventLH() -{ - return agent->startEventLH(); -} - -void ManagementObject::finishEventLH(Buffer* buf) -{ - agent->finishEventLH(buf); -} diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 1b809f5125..3778d66b7e 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -46,7 +46,7 @@ public: class ObjectId { -private: +protected: const AgentAttachment* agent; uint64_t first; uint64_t second; @@ -59,23 +59,11 @@ public: bool operator<(const ObjectId &other) const; void encode(framing::Buffer& buffer); void decode(framing::Buffer& buffer); + friend std::ostream& operator<<(std::ostream&, const ObjectId&); }; -class ManagementObject -{ - protected: - - uint64_t createTime; - uint64_t destroyTime; - ObjectId objectId; - bool configChanged; - bool instChanged; - bool deleted; - Manageable* coreObject; - sys::Mutex accessLock; - ManagementAgent* agent; - int maxThreads; - +class ManagementItem { +public: static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; static const uint8_t TYPE_U32 = 3; @@ -107,15 +95,35 @@ class ManagementObject static const uint8_t FLAG_INDEX = 0x02; static const uint8_t FLAG_END = 0x80; - static int nextThreadIndex; + const static uint8_t CLASS_KIND_TABLE = 1; + const static uint8_t CLASS_KIND_EVENT = 2; + + + +public: + virtual ~ManagementItem() {} +}; + +class ManagementObject : public ManagementItem +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + ObjectId objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + sys::Mutex accessLock; + ManagementAgent* agent; + int maxThreads; + + static int nextThreadIndex; int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* buf); - public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); @@ -129,14 +137,14 @@ class ManagementObject virtual void writeProperties(qpid::framing::Buffer& buf) = 0; virtual void writeStatistics(qpid::framing::Buffer& buf, bool skipHeaders = false) = 0; - virtual void doMethod (std::string methodName, + virtual void doMethod (std::string& methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; virtual void setReference (ObjectId objectId); - virtual std::string& getClassName (void) = 0; - virtual std::string& getPackageName (void) = 0; - virtual uint8_t* getMd5Sum (void) = 0; + virtual std::string& getClassName (void) const = 0; + virtual std::string& getPackageName (void) const = 0; + virtual uint8_t* getMd5Sum (void) const = 0; void setObjectId (ObjectId oid) { objectId = oid; } ObjectId getObjectId (void) { return objectId; } diff --git a/java/client/src/main/java/org/apache/qpid/management/Names.java b/java/client/src/main/java/org/apache/qpid/management/Names.java index 8ab9e13d4f..64e5c0fef2 100644 --- a/java/client/src/main/java/org/apache/qpid/management/Names.java +++ b/java/client/src/main/java/org/apache/qpid/management/Names.java @@ -35,7 +35,7 @@ public interface Names String METHOD_REPLY_QUEUE_PREFIX = "reply."; String AMQ_DIRECT_QUEUE = "amq.direct"; - String AGENT_ROUTING_KEY = "agent.0"; + String AGENT_ROUTING_KEY = "agent.1.0"; String BROKER_ROUTING_KEY = "broker"; @@ -49,4 +49,4 @@ public interface Names String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml"; String ARG_COUNT_PARAM_NAME = "argCount"; -}
\ No newline at end of file +} diff --git a/java/client/src/main/java/org/apache/qpid/management/Protocol.java b/java/client/src/main/java/org/apache/qpid/management/Protocol.java index f50a85f28a..185f417448 100644 --- a/java/client/src/main/java/org/apache/qpid/management/Protocol.java +++ b/java/client/src/main/java/org/apache/qpid/management/Protocol.java @@ -27,8 +27,8 @@ package org.apache.qpid.management; */ public interface Protocol { - String MAGIC_NUMBER = "AM1"; + String MAGIC_NUMBER = "AM2"; byte [] METHOD_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"M").getBytes(); byte [] SCHEMA_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"S").getBytes(); -}
\ No newline at end of file +} diff --git a/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java b/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java index 7a0ee556d2..497e264581 100644 --- a/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java +++ b/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java @@ -49,6 +49,10 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler { try { + int classKind = decoder.readUint8(); + if (classKind != 1) { + return; + } String packageName = decoder.readStr8(); String className = decoder.readStr8(); @@ -57,7 +61,7 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler int howManyProperties = decoder.readUint16(); int howManyStatistics = decoder.readUint16(); int howManyMethods = decoder.readUint16(); - int howManyEvents = decoder.readUint16(); + int howManyEvents = 0; // FIXME : Divide between schema error and raw data conversion error!!!! _domainModel.addSchema( @@ -155,4 +159,4 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler } return result; } - }
\ No newline at end of file + } diff --git a/python/commands/qpid-config b/python/commands/qpid-config index 13b489abae..8b011778d6 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -84,8 +84,8 @@ class BrokerManager: self.qmf.delBroker(self.broker) def Overview (self): - exchanges = self.qmf.getObjects(cls="exchange") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + queues = self.qmf.getObjects(_class="queue") print "Total Exchanges: %d" % len (exchanges) etype = {} for ex in exchanges: @@ -106,7 +106,7 @@ class BrokerManager: print " non-durable: %d" % (len (queues) - _durable) def ExchangeList (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") + exchanges = self.qmf.getObjects(_class="exchange") print "Durable Type Bindings Exchange Name" print "=======================================================" for ex in exchanges: @@ -114,9 +114,9 @@ class BrokerManager: print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") - bindings = self.qmf.getObjects(cls="binding") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + bindings = self.qmf.getObjects(_class="binding") + queues = self.qmf.getObjects(_class="queue") for ex in exchanges: if self.match (ex.name, filter): print "Exchange '%s' (%s)" % (ex.name, ex.type) @@ -130,8 +130,8 @@ class BrokerManager: def QueueList (self, filter): - queues = self.qmf.getObjects(cls="queue") - journals = self.qmf.getObjects(cls="journal") + queues = self.qmf.getObjects(_class="queue") + journals = self.qmf.getObjects(_class="journal") print " Store Size" print "Durable AutoDel Excl Bindings (files x file pages) Queue Name" print "===========================================================================================" @@ -151,9 +151,9 @@ class BrokerManager: YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): - exchanges = self.qmf.getObjects(cls="exchange") - bindings = self.qmf.getObjects(cls="binding") - queues = self.qmf.getObjects(cls="queue") + exchanges = self.qmf.getObjects(_class="exchange") + bindings = self.qmf.getObjects(_class="binding") + queues = self.qmf.getObjects(_class="queue") for queue in queues: if self.match (queue.name, filter): print "Queue '%s'" % queue.name diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents index 970607c797..6efd472221 100755 --- a/python/commands/qpid-printevents +++ b/python/commands/qpid-printevents @@ -30,16 +30,13 @@ class EventConsole(Console): def event(self, broker, event): print event - def heartbeat(self, agent, timestamp): - print "Heartbeat" - ## ## Main Program ## def main(): _usage = "%prog [options] [broker-addr]..." _description = \ -"""Collect and print events from one of more Qpid message brokers. If no broker-addr is +"""Collect and print events from one or more Qpid message brokers. If no broker-addr is supplied, %prog will connect to 'localhost:5672'. broker-addr is of the form: [username/password@] hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost diff --git a/python/commands/qpid-route b/python/commands/qpid-route index f9f938cdec..4dadcd543b 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -62,7 +62,7 @@ class RouteManager: self.qmf.delBroker(self.broker) def getLink (self): - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") for link in links: if "%s:%d" % (link.host, link.port) == self.src.name (): return link @@ -74,7 +74,7 @@ class RouteManager: print "Linking broker to itself is not permitted" sys.exit(1) - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link != None: @@ -92,7 +92,7 @@ class RouteManager: def DelLink (self, srcBroker): self.src = qmfconsole.BrokerURL(srcBroker) - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() if link == None: @@ -103,7 +103,7 @@ class RouteManager: print "Close method returned:", res.status, res.text def ListLinks (self): - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") if len(links) == 0: print "No Links Found" else: @@ -119,7 +119,7 @@ class RouteManager: if self.dest.name() == self.src.name(): raise Exception("Linking broker to itself is not permitted") - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") broker = brokers[0] link = self.getLink() @@ -140,7 +140,7 @@ class RouteManager: if link == None: raise Exception("Protocol Error - Missing link ID") - bridges = self.qmf.getObjects(cls="bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and \ bridge.dest == exchange and bridge.key == routingKey: @@ -164,7 +164,7 @@ class RouteManager: raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name())) sys.exit (0) - bridges = self.qmf.getObjects(cls="bridge") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey: if _verbose: @@ -186,8 +186,8 @@ class RouteManager: raise Exception("Route not found") def ListRoutes (self): - links = self.qmf.getObjects(cls="link") - bridges = self.qmf.getObjects(cls="bridge") + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: myLink = None @@ -199,8 +199,8 @@ class RouteManager: print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key) def ClearAllRoutes (self): - links = self.qmf.getObjects(cls="link") - bridges = self.qmf.getObjects(cls="bridge") + links = self.qmf.getObjects(_class="link") + bridges = self.qmf.getObjects(_class="bridge") for bridge in bridges: if _verbose: @@ -218,7 +218,7 @@ class RouteManager: print "Ok" if _dellink: - links = self.qmf.getObjects(cls="link") + links = self.qmf.getObjects(_class="link") for link in links: if _verbose: print "Deleting Link: %s:%d... " % (link.host, link.port), diff --git a/python/qpid/management.py b/python/qpid/management.py index 8d8339b2c6..81d9dbe030 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -285,7 +285,7 @@ class managementClient: ft = {} ft["_class"] = className codec.write_map (ft) - msg = channel.message(codec.encoded, routing_key="agent.%d" % bank) + msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank) channel.send ("qpid.management", msg) def syncWaitForStable (self, channel): @@ -398,7 +398,7 @@ class managementClient: """ Compose the header of a management message. """ codec.write_uint8 (ord ('A')) codec.write_uint8 (ord ('M')) - codec.write_uint8 (ord ('1')) + codec.write_uint8 (ord ('2')) codec.write_uint8 (opcode) codec.write_uint32 (seq) @@ -412,7 +412,7 @@ class managementClient: if octet != 'M': return None octet = chr (codec.read_uint8 ()) - if octet != '1': + if octet != '2': return None opcode = chr (codec.read_uint8 ()) seq = codec.read_uint32 () @@ -433,7 +433,7 @@ class managementClient: elif typecode == 6: codec.write_str8 (value) elif typecode == 7: - codec.write_vbin32 (value) + codec.write_str16 (value) elif typecode == 8: # ABSTIME codec.write_uint64 (long (value)) elif typecode == 9: # DELTATIME @@ -476,7 +476,7 @@ class managementClient: elif typecode == 6: data = str (codec.read_str8 ()) elif typecode == 7: - data = codec.read_vbin32 () + data = codec.read_str16 () elif typecode == 8: # ABSTIME data = codec.read_uint64 () elif typecode == 9: # DELTATIME @@ -604,6 +604,9 @@ class managementClient: ch.send ("qpid.management", smsg) def handleClassInd (self, ch, codec): + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return pname = str (codec.read_str8()) cname = str (codec.read_str8()) hash = codec.read_bin128() @@ -656,13 +659,15 @@ class managementClient: def parseSchema (self, ch, codec): """ Parse a received schema-description message. """ self.decOutstanding (ch) + kind = codec.read_uint8() + if kind != 1: # This API doesn't handle new-style events + return packageName = str (codec.read_str8 ()) className = str (codec.read_str8 ()) hash = codec.read_bin128 () configCount = codec.read_uint16 () instCount = codec.read_uint16 () methodCount = codec.read_uint16 () - eventCount = codec.read_uint16 () if packageName not in self.packages: return @@ -676,7 +681,6 @@ class managementClient: configs = [] insts = [] methods = {} - events = {} configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) insts.append (("id", 4, None, None)) @@ -765,42 +769,14 @@ class managementClient: args.append (arg) methods[mname] = (mdesc, args) - for idx in range (eventCount): - ft = codec.read_map () - ename = str (ft["name"]) - argCount = ft["argCount"] - if "desc" in ft: - edesc = str (ft["desc"]) - else: - edesc = None - - args = [] - for aidx in range (argCount): - ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - unit = None - desc = None - - for key, value in ft.items (): - if key == "unit": - unit = str (value) - elif key == "desc": - desc = str (value) - - arg = (name, type, unit, desc) - args.append (arg) - events[ename] = (edesc, args) - schemaClass = {} schemaClass['C'] = configs schemaClass['I'] = insts schemaClass['M'] = methods - schemaClass['E'] = events self.schema[classKey] = schemaClass if self.schemaCb != None: - self.schemaCb (ch.context, classKey, configs, insts, methods, events) + self.schemaCb (ch.context, classKey, configs, insts, methods, {}) def parsePresenceMasks(self, codec, schemaClass): """ Generate a list of not-present properties """ @@ -896,7 +872,7 @@ class managementClient: codec.write_str8 (classId[1]) codec.write_bin128 (classId[2]) codec.write_str8 (methodName) - bank = objId.getBank() + bank = "%d.%d" % (objId.getBroker(), objId.getBank()) # Encode args according to schema if classId not in self.schema: @@ -926,5 +902,5 @@ class managementClient: packageName = classId[0] className = classId[1] - msg = channel.message(codec.encoded, "agent." + str(bank)) + msg = channel.message(codec.encoded, "agent." + bank) channel.send ("qpid.management", msg) diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py index d86dd3a360..2bf66a5e5b 100644 --- a/python/qpid/managementdata.py +++ b/python/qpid/managementdata.py @@ -546,10 +546,10 @@ class ManagementData: for classKey in sorted: tuple = self.schema[classKey] row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]), - len (tuple[2]), len (tuple[3])) + len (tuple[2])) rows.append (row) self.disp.table ("Classes in Schema:", - ("Class", "Properties", "Statistics", "Methods", "Events"), + ("Class", "Properties", "Statistics", "Methods"), rows) finally: self.lock.release () diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py index 3800e54b5b..c8035e87f2 100644 --- a/python/qpid/qmfconsole.py +++ b/python/qpid/qmfconsole.py @@ -49,7 +49,7 @@ class Console: """ Invoked when a QMF package is discovered. """ pass - def newClass(self, classKey): + def newClass(self, kind, classKey): """ Invoked when a new class is discovered. Session.getSchema can be used to obtain details about the class.""" pass @@ -158,7 +158,7 @@ class Session: raise Exception(broker.error) self.brokers.append(broker) - self.getObjects(broker=broker, cls="agent") + self.getObjects(broker=broker, _class="agent") return broker def delBroker(self, broker): @@ -219,35 +219,36 @@ class Session: The class for queried objects may be specified in one of the following ways: - schema = <schema> - supply a schema object returned from getSchema - key = <key> - supply a classKey from the list returned by getClasses - cls = <name> - supply a class name as a string + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. If objects should be obtained from only one agent, use the following argument. Otherwise, the query will go to all agents. - agent = <agent> - supply an agent from the list returned by getAgents + _agent = <agent> - supply an agent from the list returned by getAgents. If the get query is to be restricted to one broker (as opposed to all connected brokers), add the following argument: - broker = <broker> - supply a broker as returned by addBroker + _broker = <broker> - supply a broker as returned by addBroker. If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. """ - if "broker" in kwargs: + if "_broker" in kwargs: brokerList = [] - brokerList.append(kwargs["broker"]) + brokerList.append(kwargs["_broker"]) else: brokerList = self.brokers for broker in brokerList: broker._waitForStable() agentList = [] - if "agent" in kwargs: - agent = kwargs["agent"] + if "_agent" in kwargs: + agent = kwargs["_agent"] if agent.broker not in brokerList: raise Exception("Supplied agent is not accessible through the supplied broker") agentList.append(agent) @@ -257,11 +258,14 @@ class Session: agentList.append(agent) cname = None - if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey() - elif "key" in kwargs: pname, cname, hash = kwargs["key"] - elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None + if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey() + elif "_key" in kwargs: pname, cname, hash = kwargs["_key"] + elif "_class" in kwargs: + pname, cname, hash = None, kwargs["_class"], None + if "_package" in kwargs: + pname = kwargs["_package"] if cname == None: - raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument") + raise Exception("No class supplied, use '_schema', '_key', or '_class' argument") map = {} map["_class"] = cname if pname != None: map["_package"] = pname @@ -269,7 +273,7 @@ class Session: self.getSelect = [] for item in kwargs: - if item != "schema" and item != "key" and item != "cls": + if item[0] != '_': self.getSelect.append((item, kwargs[item])) self.getResult = [] @@ -282,7 +286,7 @@ class Session: self.cv.release() broker._setHeader(sendCodec, 'G', seq) sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank) + smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank) broker._send(smsg) starttime = time() @@ -382,6 +386,7 @@ class Session: self.cv.release() def _handleClassInd(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() @@ -431,17 +436,18 @@ class Session: self.console.event(broker, event) def _handleSchemaResp(self, broker, codec, seq): + kind = codec.read_uint8() pname = str(codec.read_str8()) cname = str(codec.read_str8()) hash = codec.read_bin128() classKey = (pname, cname, hash) - _class = SchemaClass(classKey, codec) + _class = SchemaClass(kind, classKey, codec) self.cv.acquire() self.packages[pname][(cname, hash)] = _class self.cv.release() broker._decOutstanding() if self.console != None: - self.console.newClass(classKey) + self.console.newClass(kind, classKey) def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): pname = str(codec.read_str8()) @@ -485,7 +491,7 @@ class Session: def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: - for prop, propval in object.properties: + for prop, propval in object.getProperties(): if key == prop.name and value != propval: return False return True @@ -497,7 +503,7 @@ class Session: elif typecode == 3: data = codec.read_uint32() # U32 elif typecode == 4: data = codec.read_uint64() # U64 elif typecode == 6: data = str(codec.read_str8()) # SSTR - elif typecode == 7: data = codec.read_vbin32() # LSTR + elif typecode == 7: data = codec.read_str16() # LSTR elif typecode == 8: data = codec.read_int64() # ABSTIME elif typecode == 9: data = codec.read_uint64() # DELTATIME elif typecode == 10: data = ObjectId(codec) # REF @@ -521,7 +527,7 @@ class Session: elif typecode == 3: codec.write_uint32 (long(value)) # U32 elif typecode == 4: codec.write_uint64 (long(value)) # U64 elif typecode == 6: codec.write_str8 (value) # SSTR - elif typecode == 7: codec.write_vbin32 (value) # LSTR + elif typecode == 7: codec.write_str16 (value) # LSTR elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME elif typecode == 10: value.encode (codec) # REF @@ -577,30 +583,42 @@ class ClassKey: class SchemaClass: """ """ - def __init__(self, key, codec): + CLASS_KIND_TABLE = 1 + CLASS_KIND_EVENT = 2 + + def __init__(self, kind, key, codec): + self.kind = kind self.classKey = key self.properties = [] self.statistics = [] - self.methods = [] - self.events = [] - - propCount = codec.read_uint16() - statCount = codec.read_uint16() - methodCount = codec.read_uint16() - eventCount = codec.read_uint16() - - for idx in range(propCount): - self.properties.append(SchemaProperty(codec)) - for idx in range(statCount): - self.statistics.append(SchemaStatistic(codec)) - for idx in range(methodCount): - self.methods.append(SchemaMethod(codec)) - for idx in range(eventCount): - self.events.append(SchemaEvent(codec)) + self.methods = [] + self.arguments = [] + + if self.kind == self.CLASS_KIND_TABLE: + propCount = codec.read_uint16() + statCount = codec.read_uint16() + methodCount = codec.read_uint16() + for idx in range(propCount): + self.properties.append(SchemaProperty(codec)) + for idx in range(statCount): + self.statistics.append(SchemaStatistic(codec)) + for idx in range(methodCount): + self.methods.append(SchemaMethod(codec)) + + elif self.kind == self.CLASS_KIND_EVENT: + argCount = codec.read_uint16() + for idx in range(argCount): + self.arguments.append(SchemaArgument(codec, methodArg=False)) def __repr__(self): pname, cname, hash = self.classKey - result = "Class: %s:%s " % (pname, cname) + if self.kind == self.CLASS_KIND_TABLE: + kindStr = "Table" + elif self.kind == self.CLASS_KIND_EVENT: + kindStr = "Event" + else: + kindStr = "Unsupported" + result = "%s Class: %s:%s " % (kindStr, pname, cname) result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash) return result @@ -620,9 +638,9 @@ class SchemaClass: """ Return the list of methods for the class. """ return self.methods - def getEvents(self): + def getArguments(self): """ Return the list of events for the class. """ - return self.events + return self.arguments class SchemaProperty: """ """ @@ -693,33 +711,6 @@ class SchemaMethod: result += ")" return result -class SchemaEvent: - """ """ - def __init__(self, codec): - map = codec.read_map() - self.name = str(map["name"]) - argCount = map["argCount"] - if "desc" in map: - self.desc = str(map["desc"]) - else: - self.desc = None - self.arguments = [] - - for idx in range(argCount): - self.arguments.append(SchemaArgument(codec, methodArg=False)) - - def __repr__(self): - result = self.name + "(" - first = True - for arg in self.arguments: - if first: - first = False - else: - result += ", " - result += arg.name - result += ")" - return result - class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -743,7 +734,7 @@ class SchemaArgument: elif key == "desc" : self.desc = str(value) elif key == "default" : self.default = str(value) -class ObjectId(object): +class ObjectId: """ Object that represents QMF object identifiers """ def __init__(self, codec, first=0, second=0): if codec: @@ -800,80 +791,86 @@ class Object(object): """ """ def __init__(self, session, broker, schema, codec, prop, stat): """ """ - self.session = session - self.broker = broker - self.schema = schema - self.currentTime = codec.read_uint64() - self.createTime = codec.read_uint64() - self.deleteTime = codec.read_uint64() - self.objectId = ObjectId(codec) - self.properties = [] - self.statistics = [] + self._session = session + self._broker = broker + self._schema = schema + self._currentTime = codec.read_uint64() + self._createTime = codec.read_uint64() + self._deleteTime = codec.read_uint64() + self._objectId = ObjectId(codec) + self._properties = [] + self._statistics = [] if prop: notPresent = self._parsePresenceMasks(codec, schema) for property in schema.getProperties(): if property.name in notPresent: - self.properties.append((property, None)) + self._properties.append((property, None)) else: - self.properties.append((property, self.session._decodeValue(codec, property.type))) + self._properties.append((property, self._session._decodeValue(codec, property.type))) if stat: for statistic in schema.getStatistics(): - self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type))) def getObjectId(self): """ Return the object identifier for this object """ - return self.objectId + return self._objectId def getClassKey(self): """ Return the class-key that references the schema describing this object. """ - return self.schema.getKey() + return self._schema.getKey() def getSchema(self): """ Return the schema that describes this object. """ - return self.schema + return self._schema def getMethods(self): """ Return a list of methods available for this object. """ - return self.schema.getMethods() + return self._schema.getMethods() def getTimestamps(self): """ Return the current, creation, and deletion times for this object. """ - return self.currentTime, self.createTime, self.deleteTime + return self._currentTime, self._createTime, self._deleteTime def getIndex(self): """ Return a string describing this object's primary key. """ result = "" - for property, value in self.properties: + for property, value in self._properties: if property.index: if result != "": result += ":" result += str(value) return result + def getProperties(self): + return self._properties + + def getStatistics(self): + return self._statistics + def __repr__(self): return self.getIndex() def __getattr__(self, name): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self.properties: + for property, value in self._properties: if name == property.name: return value - for statistic, value in self.statistics: + for statistic, value in self._statistics: if name == statistic.name: return value raise Exception("Type Object has no attribute '%s'" % name) def _invoke(self, name, args, kwargs): - for method in self.schema.getMethods(): + for method in self._schema.getMethods(): if name == method.name: aIdx = 0 - sendCodec = Codec(self.broker.conn.spec) - seq = self.session.seqMgr._reserve((self, method)) - self.broker._setHeader(sendCodec, 'M', seq) - self.objectId.encode(sendCodec) - pname, cname, hash = self.schema.getKey() + sendCodec = Codec(self._broker.conn.spec) + seq = self._session.seqMgr._reserve((self, method)) + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + pname, cname, hash = self._schema.getKey() sendCodec.write_str8(pname) sendCodec.write_str8(cname) sendCodec.write_bin128(hash) @@ -888,29 +885,30 @@ class Object(object): for arg in method.arguments: if arg.dir.find("I") != -1: - self.session._encodeValue(sendCodec, args[aIdx], arg.type) + self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 - smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank())) - self.broker.cv.acquire() - self.broker.syncInFlight = True - self.broker.cv.release() + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % + (self._objectId.getBroker(), self._objectId.getBank())) + self._broker.cv.acquire() + self._broker.syncInFlight = True + self._broker.cv.release() - self.broker._send(smsg) + self._broker._send(smsg) - self.broker.cv.acquire() + self._broker.cv.acquire() starttime = time() - while self.broker.syncInFlight and self.broker.error == None: - self.broker.cv.wait(self.broker.SYNC_TIME) - if time() - starttime > self.broker.SYNC_TIME: - self.broker.cv.release() - self.session.seqMgr._release(seq) + while self._broker.syncInFlight and self._broker.error == None: + self._broker.cv.wait(self._broker.SYNC_TIME) + if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.release() + self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") - self.broker.cv.release() - if self.broker.error != None: - errorText = self.broker.error - self.broker.error = None + self._broker.cv.release() + if self._broker.error != None: + errorText = self._broker.error + self._broker.error = None raise Exception(errorText) - return self.broker.syncResult + return self._broker.syncResult raise Exception("Invalid Method (software defect) [%s]" % name) def _parsePresenceMasks(self, codec, schema): @@ -954,7 +952,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.agents = {} - self.agents[0] = Agent(self, 0, "BrokerAgent") + self.agents[0] = Agent(self, "1.0", "BrokerAgent") self.topicBound = False self.cv = Condition() self.syncInFlight = False @@ -1040,14 +1038,15 @@ class Broker: self.error = "Connect Failed %d - %s" % (e[0], e[1]) def _updateAgent(self, obj): - if obj.deleteTime == 0: - if obj.objectIdBank not in self.agents: - agent = Agent(self, obj.objectIdBank, obj.label) - self.agents[obj.objectIdBank] = agent + bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank) + if obj._deleteTime == 0: + if bankKey not in self.agents: + agent = Agent(self, bankKey, obj.label) + self.agents[bankKey] = agent if self.session.console != None: self.session.console.newAgent(agent) else: - agent = self.agents.pop(obj.objectIdBank, None) + agent = self.agents.pop(bankKey, None) if agent != None and self.session.console != None: self.session.console.delAgent(agent) @@ -1055,7 +1054,7 @@ class Broker: """ Compose the header of a management message. """ codec.write_uint8(ord('A')) codec.write_uint8(ord('M')) - codec.write_uint8(ord('1')) + codec.write_uint8(ord('2')) codec.write_uint8(ord(opcode)) codec.write_uint32(seq) @@ -1068,7 +1067,7 @@ class Broker: if octet != 'M': return None, None octet = chr(codec.read_uint8()) - if octet != '1': + if octet != '2': return None, None opcode = chr(codec.read_uint8()) seq = codec.read_uint32() @@ -1164,28 +1163,24 @@ class Agent: self.label = label def __repr__(self): - return "Agent at bank %d (%s)" % (self.bank, self.label) + return "Agent at bank %s (%s)" % (self.bank, self.label) class Event: """ """ def __init__(self, session, codec): self.session = session - self.timestamp = codec.read_int64() - self.objectId = ObjectId(codec) pname = codec.read_str8() cname = codec.read_str8() hash = codec.read_bin128() self.classKey = (pname, cname, hash) - self.name = codec.read_str8() + self.timestamp = codec.read_int64() + self.schema = None if pname in session.packages: if (cname, hash) in session.packages[pname]: - schema = session.packages[pname][(cname, hash)] - for event in schema.getEvents(): - if event.name == self.name: - self.schemaEvent = event - self.arguments = {} - for arg in event.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type) + self.schema = session.packages[pname][(cname, hash)] + self.arguments = {} + for arg in self.schema.arguments: + self.arguments[arg.name] = session._decodeValue(codec, arg.type) def __repr__(self): return self.getSyslogText() @@ -1202,10 +1197,15 @@ class Event: def getName(self): return self.name + def getSchema(self): + return self.schema + def getSyslogText(self): + if self.schema == None: + return "<uninterpretable>" out = strftime("%c", gmtime(self.timestamp / 1000000000)) - out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name - for arg in self.schemaEvent.arguments: + out += " " + self.classKey[0] + ":" + self.classKey[1] + for arg in self.schema.arguments: out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type) return out @@ -1247,8 +1247,8 @@ class DebugConsole(Console): def newPackage(self, name): print "newPackage:", name - def newClass(self, classKey): - print "newClass:", classKey + def newClass(self, kind, classKey): + print "newClass:", kind, classKey def newAgent(self, agent): print "newAgent:", agent diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py index eea1b29404..efec2b8a92 100644 --- a/python/tests_0-10/management.py +++ b/python/tests_0-10/management.py @@ -59,7 +59,7 @@ class ManagementTest (TestBase010): session = self.session self.startQmf() - brokers = self.qmf.getObjects(cls="broker") + brokers = self.qmf.getObjects(_class="broker") self.assertEqual (len(brokers), 1) broker = brokers[0] @@ -147,43 +147,43 @@ class ManagementTest (TestBase010): session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True) session.exchange_bind(queue="dest-queue", exchange="amq.direct") - queues = self.qmf.getObjects(cls="queue") + queues = self.qmf.getObjects(_class="queue") "Move 10 messages from src-queue to dest-queue" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10) self.assertEqual (result.status, 0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,10) self.assertEqual (dq.msgDepth,10) "Move all remaining messages to destination" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0) self.assertEqual (result.status,0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,0) self.assertEqual (dq.msgDepth,20) "Use a bad source queue name" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0) self.assertEqual (result.status,4) "Use a bad destination queue name" - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0) self.assertEqual (result.status,4) " Use a large qty (40) to move from dest-queue back to " " src-queue- should move all " - result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) + result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40) self.assertEqual (result.status,0) - sq = self.qmf.getObjects(cls="queue", name="src-queue")[0] - dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0] + sq = self.qmf.getObjects(_class="queue", name="src-queue")[0] + dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0] self.assertEqual (sq.msgDepth,20) self.assertEqual (dq.msgDepth,0) @@ -216,23 +216,23 @@ class ManagementTest (TestBase010): msg = Message(props, body) session.message_transfer(destination="amq.direct", message=msg) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] "Purge top message from purge-queue" result = pq.purge(1) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,19) "Purge top 9 messages from purge-queue" result = pq.purge(9) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,10) "Purge all messages from purge-queue" result = pq.purge(0) self.assertEqual (result.status, 0) - pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0] + pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) diff --git a/specs/management-schema.xml b/specs/management-schema.xml index a06ddca44b..dc42a4108e 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -86,17 +86,6 @@ <arg name="password" dir="I" type="sstr"/> </method> - <event name="agentConnect" desc="QMF Management Agent has connected to the broker"> - <arg name="remoteAddress" type="sstr"/> - <arg name="label" type="sstr"/> - <arg name="brokerBank" type="uint32"/> - <arg name="agentBank" type="uint32"/> - </event> - - <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker"> - <arg name="remoteAddress" type="sstr"/> - </event> - <method name="queueMoveMessages" desc="Move messages from one queue to another"> <arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/> <arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/> @@ -115,7 +104,8 @@ <property name="label" type="sstr" access="RO" desc="Label for agent"/> <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/> <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/> - <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/> + <property name="brokerBank" type="uint32" access="RO" desc="Assigned object-id broker bank"/> + <property name="agentBank" type="uint32" access="RO" desc="Assigned object-id agent bank"/> </class> <!-- @@ -218,7 +208,7 @@ <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> <property name="address" type="sstr" access="RC" index="y"/> <property name="incoming" type="bool" access="RC"/> - <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation ,...)"/> + <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/> <statistic name="closing" type="bool" desc="This client is closing by management request"/> <statistic name="federationLink" type="bool" desc="Is this a federation link"/> @@ -305,5 +295,36 @@ <method name="resetLifespan"/> <method name="close"/> </class> + + <eventArguments> + <arg name="altEx" type="sstr" desc="Name of the alternate exchange"/> + <arg name="args" type="map" desc="Supplemental arguments or parameters supplied"/> + <arg name="autoDel" type="bool" desc="Created object is automatically deleted when no longer in use"/> + <arg name="dest" type="sstr" desc="Destination tag for a subscription"/> + <arg name="disp" type="sstr" desc="Disposition of a declaration: 'created' if object was created, 'existing' if object already existed"/> + <arg name="durable" type="bool" desc="Created object is durable"/> + <arg name="exName" type="sstr" desc="Name of an exchange"/> + <arg name="exType" type="sstr" desc="Type of an exchange"/> + <arg name="excl" type="bool" desc="Created object is exclusive for the use of the owner only"/> + <arg name="key" type="sstr" desc="Key text used for routing or binding"/> + <arg name="qName" type="sstr" desc="Name of a queue"/> + <arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/> + <arg name="user" type="sstr" desc="Authentication identity"/> + </eventArguments> + + <event name="clientConnect" args="rhost, user"/> + <event name="clientDisconnect" args="rhost, user"/> + <event name="agentConnect" args="rhost, user"/> + <event name="agentDisconnect" args="rhost, user"/> + <event name="brokerConnect" args="rhost, user"/> + <event name="brokerDisconnect" args="rhost, user"/> + <event name="queueDeclare" args="rhost, user, qName, durable, excl, autoDel, args, disp"/> + <event name="queueDelete" args="rhost, user, qName"/> + <event name="exchangeDeclare" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/> + <event name="exchangeDelete" args="rhost, user, exName"/> + <event name="bind" args="rhost, user, exName, qName, key, args"/> + <event name="unbind" args="rhost, user, exName, qName, key"/> + <event name="subscribe" args="rhost, user, qName, dest, excl, args"/> + <event name="unsubscribe" args="rhost, user, dest"/> </schema> |