summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/examples/qmf-agent/example.cpp3
-rw-r--r--cpp/examples/qmf-agent/schema.xml17
-rwxr-xr-xcpp/managementgen/qmf-gen2
-rwxr-xr-xcpp/managementgen/qmf/generate.py18
-rw-r--r--cpp/managementgen/qmf/management-types.xml2
-rwxr-xr-xcpp/managementgen/qmf/schema.py246
-rw-r--r--cpp/managementgen/qmf/templates/Class.cpp11
-rw-r--r--cpp/managementgen/qmf/templates/Class.h12
-rw-r--r--cpp/managementgen/qmf/templates/Event.cpp77
-rw-r--r--cpp/managementgen/qmf/templates/Event.h58
-rw-r--r--cpp/src/qpid/acl/Acl.cpp29
-rw-r--r--cpp/src/qpid/acl/Acl.h3
-rw-r--r--cpp/src/qpid/acl/management-schema.xml42
-rw-r--r--cpp/src/qpid/agent/ManagementAgent.h37
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp186
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h70
-rw-r--r--cpp/src/qpid/broker/Connection.cpp1
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h4
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp268
-rw-r--r--cpp/src/qpid/framing/Buffer.h4
-rw-r--r--cpp/src/qpid/management/Manageable.h1
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp328
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h60
-rw-r--r--cpp/src/qpid/management/ManagementEvent.h48
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp30
-rw-r--r--cpp/src/qpid/management/ManagementObject.h58
-rw-r--r--java/client/src/main/java/org/apache/qpid/management/Names.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/management/Protocol.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java8
-rwxr-xr-xpython/commands/qpid-config22
-rwxr-xr-xpython/commands/qpid-printevents5
-rwxr-xr-xpython/commands/qpid-route24
-rw-r--r--python/qpid/management.py52
-rw-r--r--python/qpid/managementdata.py4
-rw-r--r--python/qpid/qmfconsole.py274
-rw-r--r--python/tests_0-10/management.py34
-rw-r--r--specs/management-schema.xml47
37 files changed, 1246 insertions, 847 deletions
diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp
index 7c514b1010..e24e729e1d 100644
--- a/cpp/examples/qmf-agent/example.cpp
+++ b/cpp/examples/qmf-agent/example.cpp
@@ -26,6 +26,7 @@
#include "qmf/org/apache/qpid/agent/example/Parent.h"
#include "qmf/org/apache/qpid/agent/example/Child.h"
#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h"
+#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h"
#include "qmf/org/apache/qpid/agent/example/Package.h"
#include <unistd.h>
@@ -129,7 +130,7 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args,
children.push_back(child);
- mgmtObject->event_childCreated(ioArgs.i_name);
+ agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name));
return STATUS_OK;
}
diff --git a/cpp/examples/qmf-agent/schema.xml b/cpp/examples/qmf-agent/schema.xml
index 22ecabb8a5..1bf701a655 100644
--- a/cpp/examples/qmf-agent/schema.xml
+++ b/cpp/examples/qmf-agent/schema.xml
@@ -37,14 +37,6 @@
<arg name="name" dir="I" type="sstr"/>
<arg name="childRef" dir="O" type="objId"/>
</method>
-
- <event name="childCreated">
- <arg name="name" type="sstr"/>
- </event>
-
- <event name="childDestroyed">
- <arg name="name" type="sstr"/>
- </event>
</class>
@@ -59,7 +51,14 @@
<statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
- <method name="delete"/>
+ <method name="delete"/>
</class>
+
+ <eventArguments>
+ <arg name="childName" type="sstr"/>
+ </eventArguments>
+
+ <event name="ChildCreated" args="childName"/>
+ <event name="ChildDestroyed" args="childName"/>
</schema>
diff --git a/cpp/managementgen/qmf-gen b/cpp/managementgen/qmf-gen
index 523579fe6c..62362e3cad 100755
--- a/cpp/managementgen/qmf-gen
+++ b/cpp/managementgen/qmf-gen
@@ -58,6 +58,8 @@ for schemafile in args:
gen.makeClassFiles ("Class.h", package)
gen.makeClassFiles ("Class.cpp", package)
gen.makeMethodFiles ("Args.h", package)
+ gen.makeEventFiles ("Event.h", package)
+ gen.makeEventFiles ("Event.cpp", package)
gen.makePackageFile ("Package.h", package)
gen.makePackageFile ("Package.cpp", package)
diff --git a/cpp/managementgen/qmf/generate.py b/cpp/managementgen/qmf/generate.py
index 7346200a28..958728d739 100755
--- a/cpp/managementgen/qmf/generate.py
+++ b/cpp/managementgen/qmf/generate.py
@@ -250,6 +250,14 @@ class Generator:
path = self.packagePath + _class.getNameCap () + extension
return path
+ def targetEventFile (self, event, templateFile):
+ dot = templateFile.find(".")
+ if dot == -1:
+ raise ValueError ("Invalid template file name %s" % templateFile)
+ extension = templateFile[dot:len (templateFile)]
+ path = self.packagePath + "Event" + event.getNameCap () + extension
+ return path
+
def targetMethodFile (self, method, templateFile):
""" Return the file name for a method file """
dot = templateFile.rfind(".")
@@ -293,6 +301,16 @@ class Generator:
stream = template.expand (_class)
self.writeIfChanged (stream, target, force)
+ def makeEventFiles (self, templateFile, schema, force=False):
+ """ Generate an expanded template per schema event """
+ events = schema.getEvents()
+ template = Template (self.input + templateFile, self)
+ self.templateFiles.append (templateFile)
+ for event in events:
+ target = self.targetEventFile(event, templateFile)
+ stream = template.expand(event)
+ self.writeIfChanged(stream, target, force)
+
def makeMethodFiles (self, templateFile, schema, force=False):
""" Generate an expanded template per method-with-arguments """
classes = schema.getClasses ()
diff --git a/cpp/managementgen/qmf/management-types.xml b/cpp/managementgen/qmf/management-types.xml
index 6e34421d99..626880afb3 100644
--- a/cpp/managementgen/qmf/management-types.xml
+++ b/cpp/managementgen/qmf/management-types.xml
@@ -30,7 +30,7 @@
<type name="int64" base="S64" cpp="int64_t" encode="@.putInt64(#)" decode="# = @.getInt64()" accessor="direct" init="0"/>
<type name="bool" base="BOOL" cpp="uint8_t" encode="@.putOctet(#?1:0)" decode="# = @.getOctet()==1" accessor="direct" init="0"/>
<type name="sstr" base="SSTR" cpp="std::string" encode="@.putShortString(#)" decode="@.getShortString(#)" accessor="direct" init='""' byRef="y"/>
-<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString(#)" decode="@.getLongString(#)" accessor="direct" init='""' byRef="y"/>
+<type name="lstr" base="LSTR" cpp="std::string" encode="@.putMediumString(#)" decode="@.getMediumString(#)" accessor="direct" init='""' byRef="y"/>
<type name="absTime" base="ABSTIME" cpp="int64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/>
<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/>
<type name="float" base="FLOAT" cpp="float" encode="@.putFloat(#)" decode="# = @.getFloat()" accessor="direct" init="0."/>
diff --git a/cpp/managementgen/qmf/schema.py b/cpp/managementgen/qmf/schema.py
index 2ecf9c351f..48e697ab4a 100755
--- a/cpp/managementgen/qmf/schema.py
+++ b/cpp/managementgen/qmf/schema.py
@@ -21,6 +21,32 @@ from xml.dom.minidom import parse, parseString, Node
from cStringIO import StringIO
import md5
+class Hash:
+ """ Manage the hash of an XML sub-tree """
+ def __init__(self, node):
+ self.md5Sum = md5.new()
+ self._compute(node)
+
+ def addSubHash(self, hash):
+ """ Use this method to add the hash of a dependend-on XML fragment that is not in the sub-tree """
+ self.md5Sum.update(hash.getDigest())
+
+ def getDigest(self):
+ return self.md5Sum.digest()
+
+ def _compute(self, node):
+ attrs = node.attributes
+ self.md5Sum.update(node.nodeName)
+
+ for idx in range(attrs.length):
+ self.md5Sum.update(attrs.item(idx).nodeName)
+ self.md5Sum.update(attrs.item(idx).nodeValue)
+
+ for child in node.childNodes:
+ if child.nodeType == Node.ELEMENT_NODE:
+ self._compute(child)
+
+
#=====================================================================================
#
#=====================================================================================
@@ -525,6 +551,7 @@ class SchemaArg:
self.maxLen = None
self.desc = None
self.default = None
+ self.hash = Hash(node)
attrs = node.attributes
for idx in range (attrs.length):
@@ -675,12 +702,12 @@ class SchemaMethod:
#
#=====================================================================================
class SchemaEvent:
- def __init__ (self, parent, node, typespec):
- self.parent = parent
- self.name = None
- self.desc = None
- self.args = []
- self.defaultSeverity = None
+ def __init__ (self, package, node, typespec, argset):
+ self.packageName = package
+ self.name = None
+ self.desc = None
+ self.args = []
+ self.hash = Hash(node)
attrs = node.attributes
for idx in range (attrs.length):
@@ -692,72 +719,96 @@ class SchemaEvent:
elif key == 'desc':
self.desc = val
- elif key == 'defaultSeverity':
- self.defaultSeverity = val
+ elif key == 'args':
+ list = val.replace(" ", "").split(",")
+ for item in list:
+ if item not in argset.args:
+ raise Exception("undefined argument '%s' in event" % item)
+ self.args.append(argset.args[item])
+ self.hash.addSubHash(argset.args[item].hash)
else:
raise ValueError ("Unknown attribute in event '%s'" % key)
- for child in node.childNodes:
- if child.nodeType == Node.ELEMENT_NODE:
- if child.nodeName == 'arg':
- arg = SchemaArg (child, typespec)
- self.args.append (arg)
- else:
- raise ValueError ("Unknown event tag '%s'" % child.nodeName)
-
def getName (self):
return self.name
+ def getNameCap(self):
+ return capitalize(self.name)
+
def getFullName (self):
- return capitalize(self.parent.getName()) + capitalize(self.name)
+ return capitalize(self.package + capitalize(self.name))
def getArgCount (self):
return len (self.args)
- def genMethodBody (self, stream, variables, classObject):
- stream.write("void ")
- classObject.genNameCap(stream, variables)
- stream.write("::event_%s(" % self.name)
- count = 0
+ def genArgCount (self, stream, variables):
+ stream.write("%d" % len(self.args))
+
+ def genArgDeclarations(self, stream, variables):
for arg in self.args:
- arg.genFormalParam(stream, variables)
- count += 1
- if count < len(self.args):
- stream.write(", ")
- stream.write(") {\n")
- stream.write(" ::qpid::sys::Mutex::ScopedLock mutex(getMutex());\n")
- stream.write(" Buffer* buf = startEventLH();\n")
- stream.write(" objectId.encode(*buf);\n")
- stream.write(" buf->putShortString(packageName);\n")
- stream.write(" buf->putShortString(className);\n")
- stream.write(" buf->putBin128(md5Sum);\n")
- stream.write(" buf->putShortString(\"%s\");\n" % self.name)
+ if arg.type.type.byRef:
+ ref = "&"
+ else:
+ ref = ""
+ stream.write(" const %s%s %s;\n" % (arg.type.type.cpp, ref, arg.name))
+
+ def genCloseNamespaces (self, stream, variables):
+ for item in self.packageName.split("."):
+ stream.write ("}")
+
+ def genConstructorArgs(self, stream, variables):
+ pre = ""
for arg in self.args:
- stream.write(" %s;\n" % arg.type.type.encode.replace("@", "(*buf)").replace("#", "_" + arg.name))
- stream.write(" finishEventLH(buf);\n")
- stream.write("}\n\n")
+ if arg.type.type.byRef:
+ ref = "&"
+ else:
+ ref = ""
+ stream.write("%sconst %s%s _%s" % (pre, arg.type.type.cpp, ref, arg.name))
+ pre = ",\n "
- def genMethodDecl (self, stream, variables):
- stream.write(" void event_%s(" % self.name)
- count = 0
+ def genConstructorInits(self, stream, variables):
+ pre = ""
for arg in self.args:
- arg.genFormalParam(stream, variables)
- count += 1
- if count < len(self.args):
- stream.write(", ")
- stream.write(");\n")
+ stream.write("%s%s(_%s)" % (pre, arg.name, arg.name))
+ pre = ",\n "
- def genSchema(self, stream, variables):
- stream.write (" ft = FieldTable ();\n")
- stream.write (" ft.setString (NAME, \"" + self.name + "\");\n")
- stream.write (" ft.setInt (ARGCOUNT, " + str (len (self.args)) + ");\n")
- if self.desc != None:
- stream.write (" ft.setString (DESC, \"" + self.desc + "\");\n")
- stream.write (" buf.put (ft);\n\n")
+ def genName(self, stream, variables):
+ stream.write(self.name)
+
+ def genNameCap(self, stream, variables):
+ stream.write(capitalize(self.name))
+
+ def genNamespace (self, stream, variables):
+ stream.write("::".join(self.packageName.split(".")))
+
+ def genNameLower(self, stream, variables):
+ stream.write(self.name.lower())
+
+ def genNameUpper(self, stream, variables):
+ stream.write(self.name.upper())
+
+ def genNamePackageLower(self, stream, variables):
+ stream.write(self.packageName.lower())
+
+ def genOpenNamespaces (self, stream, variables):
+ for item in self.packageName.split("."):
+ stream.write ("namespace %s {\n" % item)
+
+ def genArgEncodes(self, stream, variables):
+ for arg in self.args:
+ stream.write(" " + arg.type.type.encode.replace("@", "buf").replace("#", arg.name) + ";\n")
+
+ def genArgSchema(self, stream, variables):
for arg in self.args:
- arg.genSchema (stream, True)
+ arg.genSchema(stream, True)
+ def genSchemaMD5(self, stream, variables):
+ sum = self.hash.getDigest()
+ for idx in range (len (sum)):
+ if idx != 0:
+ stream.write (",")
+ stream.write (hex (ord (sum[idx])))
class SchemaClass:
@@ -768,9 +819,7 @@ class SchemaClass:
self.methods = []
self.events = []
self.options = options
- self.md5Sum = md5.new ()
-
- self.hash (node)
+ self.hash = Hash(node)
attrs = node.attributes
self.name = makeValidCppSymbol(attrs['name'].nodeValue)
@@ -790,10 +839,6 @@ class SchemaClass:
sub = SchemaMethod (self, child, typespec)
self.methods.append (sub)
- elif child.nodeName == 'event':
- sub = SchemaEvent (self, child, typespec)
- self.events.append (sub)
-
elif child.nodeName == 'group':
self.expandFragment (child, fragments)
@@ -820,24 +865,12 @@ class SchemaClass:
result = result[0:pos] + "threadStats->" + result[pos:]
start = pos + 9 + len(next[1])
- def hash (self, node):
- attrs = node.attributes
- self.md5Sum.update (node.nodeName)
-
- for idx in range (attrs.length):
- self.md5Sum.update (attrs.item(idx).nodeName)
- self.md5Sum.update (attrs.item(idx).nodeValue)
-
- for child in node.childNodes:
- if child.nodeType == Node.ELEMENT_NODE:
- self.hash (child)
-
def expandFragment (self, node, fragments):
attrs = node.attributes
name = attrs['name'].nodeValue
for fragment in fragments:
if fragment.name == name:
- self.md5Sum.update (fragment.md5Sum.digest())
+ self.hash.addSubHash(fragment.hash)
for config in fragment.properties:
self.properties.append (config)
for inst in fragment.statistics:
@@ -937,27 +970,12 @@ class SchemaClass:
inArgCount = inArgCount + 1
if methodCount == 0:
- stream.write ("string, Buffer&, Buffer& outBuf")
+ stream.write ("string&, Buffer&, Buffer& outBuf")
else:
if inArgCount == 0:
- stream.write ("string methodName, Buffer&, Buffer& outBuf")
+ stream.write ("string& methodName, Buffer&, Buffer& outBuf")
else:
- stream.write ("string methodName, Buffer& inBuf, Buffer& outBuf")
-
- def genEventCount (self, stream, variables):
- stream.write ("%d" % len (self.events))
-
- def genEventMethodBodies (self, stream, variables):
- for event in self.events:
- event.genMethodBody (stream, variables, self)
-
- def genEventMethodDecls (self, stream, variables):
- for event in self.events:
- event.genMethodDecl (stream, variables)
-
- def genEventSchema (self, stream, variables):
- for event in self.events:
- event.genSchema (stream, variables)
+ stream.write ("string& methodName, Buffer& inBuf, Buffer& outBuf")
def genHiLoStatResets (self, stream, variables):
for inst in self.statistics:
@@ -1124,7 +1142,7 @@ class SchemaClass:
return
def genSchemaMD5 (self, stream, variables):
- sum = self.md5Sum.digest ()
+ sum = self.hash.getDigest()
for idx in range (len (sum)):
if idx != 0:
stream.write (",")
@@ -1149,6 +1167,20 @@ class SchemaClass:
stat.genWrite (stream)
+class SchemaEventArgs:
+ def __init__(self, package, node, typespec, fragments, options):
+ self.packageName = package
+ self.options = options
+ self.args = {}
+
+ children = node.childNodes
+ for child in children:
+ if child.nodeType == Node.ELEMENT_NODE:
+ if child.nodeName == 'arg':
+ arg = SchemaArg(child, typespec)
+ self.args[arg.name] = arg
+ else:
+ raise Exception("Unknown tag '%s' in <eventArguments>" % child.nodeName)
class SchemaPackage:
def __init__ (self, typefile, schemafile, options):
@@ -1156,6 +1188,8 @@ class SchemaPackage:
self.classes = []
self.fragments = []
self.typespec = TypeSpec (typefile)
+ self.eventArgSet = None
+ self.events = []
dom = parse (schemafile)
document = dom.documentElement
@@ -1179,6 +1213,15 @@ class SchemaPackage:
self.fragments, options)
self.fragments.append (cls)
+ elif child.nodeName == 'eventArguments':
+ if self.eventArgSet:
+ raise Exception("Only one <eventArguments> may appear in a package")
+ self.eventArgSet = SchemaEventArgs(self.packageName, child, self.typespec, self.fragments, options)
+
+ elif child.nodeName == 'event':
+ event = SchemaEvent(self.packageName, child, self.typespec, self.eventArgSet)
+ self.events.append(event)
+
else:
raise ValueError ("Unknown schema tag '%s'" % child.nodeName)
@@ -1194,6 +1237,9 @@ class SchemaPackage:
def getClasses (self):
return self.classes
+ def getEvents(self):
+ return self.events
+
def genCloseNamespaces (self, stream, variables):
for item in self.packageName.split("."):
stream.write ("}")
@@ -1217,12 +1263,20 @@ class SchemaPackage:
stream.write ("#include \"")
_class.genNameCap (stream, variables)
stream.write (".h\"\n")
+ for _event in self.events:
+ stream.write ("#include \"Event")
+ _event.genNameCap(stream, variables)
+ stream.write (".h\"\n")
- def genClassRegisters (self, stream, variables):
+ def genClassRegisters(self, stream, variables):
for _class in self.classes:
- stream.write (" ")
- _class.genNameCap (stream, variables)
- stream.write ("::registerClass(agent);\n")
+ stream.write(" ")
+ _class.genNameCap(stream, variables)
+ stream.write("::registerSelf(agent);\n")
+ for _event in self.events:
+ stream.write(" Event")
+ _event.genNameCap(stream, variables)
+ stream.write("::registerSelf(agent);\n")
#=====================================================================================
diff --git a/cpp/managementgen/qmf/templates/Class.cpp b/cpp/managementgen/qmf/templates/Class.cpp
index 964e6f8349..0a69939821 100644
--- a/cpp/managementgen/qmf/templates/Class.cpp
+++ b/cpp/managementgen/qmf/templates/Class.cpp
@@ -85,9 +85,9 @@ namespace {
const string DEFAULT("default");
}
-void /*MGEN:Class.NameCap*/::registerClass(ManagementAgent* agent)
+void /*MGEN:Class.NameCap*/::registerSelf(ManagementAgent* agent)
{
- agent->RegisterClass(packageName, className, md5Sum, writeSchema);
+ agent->registerClass(packageName, className, md5Sum, writeSchema);
}
void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
@@ -95,13 +95,13 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
FieldTable ft;
// Schema class header:
+ buf.putOctet (CLASS_KIND_TABLE);
buf.putShortString (packageName); // Package Name
buf.putShortString (className); // Class Name
buf.putBin128 (md5Sum); // Schema Hash
buf.putShort (/*MGEN:Class.ConfigCount*/); // Config Element Count
buf.putShort (/*MGEN:Class.InstCount*/); // Inst Element Count
buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count
- buf.putShort (/*MGEN:Class.EventCount*/); // Event Count
// Properties
/*MGEN:Class.PropertySchema*/
@@ -109,8 +109,6 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf)
/*MGEN:Class.StatisticSchema*/
// Methods
/*MGEN:Class.MethodSchema*/
- // Events
-/*MGEN:Class.EventSchema*/
}
/*MGEN:IF(Class.ExistPerThreadStats)*/
@@ -176,9 +174,8 @@ void /*MGEN:Class.NameCap*/::doMethod (/*MGEN:Class.DoMethodArgs*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
std::string text;
+
/*MGEN:Class.MethodHandlers*/
outBuf.putLong(status);
outBuf.putShortString(Manageable::StatusText(status, text));
}
-
-/*MGEN:Class.EventMethodBodies*/
diff --git a/cpp/managementgen/qmf/templates/Class.h b/cpp/managementgen/qmf/templates/Class.h
index 99ebc68789..2a995c95a5 100644
--- a/cpp/managementgen/qmf/templates/Class.h
+++ b/cpp/managementgen/qmf/templates/Class.h
@@ -72,7 +72,7 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
void writeProperties (::qpid::framing::Buffer& buf);
void writeStatistics (::qpid::framing::Buffer& buf,
bool skipHeaders = false);
- void doMethod (std::string methodName,
+ void doMethod (std::string& methodName,
::qpid::framing::Buffer& inBuf,
::qpid::framing::Buffer& outBuf);
writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
@@ -88,17 +88,15 @@ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
/*MGEN:Class.SetGeneralReferenceDeclaration*/
- static void registerClass (::qpid::management::ManagementAgent* agent);
- std::string& getPackageName (void) { return packageName; }
- std::string& getClassName (void) { return className; }
- uint8_t* getMd5Sum (void) { return md5Sum; }
+ static void registerSelf (::qpid::management::ManagementAgent* agent);
+ std::string& getPackageName (void) const { return packageName; }
+ std::string& getClassName (void) const { return className; }
+ uint8_t* getMd5Sum (void) const { return md5Sum; }
// Method IDs
/*MGEN:Class.MethodIdDeclarations*/
// Accessor Methods
/*MGEN:Class.AccessorMethods*/
- // Event Methods
-/*MGEN:Class.EventMethodDecls*/
};
}/*MGEN:Class.CloseNamespaces*/
diff --git a/cpp/managementgen/qmf/templates/Event.cpp b/cpp/managementgen/qmf/templates/Event.cpp
new file mode 100644
index 0000000000..cdb40c6d79
--- /dev/null
+++ b/cpp/managementgen/qmf/templates/Event.cpp
@@ -0,0 +1,77 @@
+/*MGEN:commentPrefix=//*/
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/*MGEN:Root.Disclaimer*/
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/agent/ManagementAgent.h"
+#include "Event/*MGEN:Event.NameCap*/.h"
+
+using namespace qmf::/*MGEN:Event.Namespace*/;
+using namespace qpid::framing;
+using qpid::management::ManagementAgent;
+using qpid::management::Manageable;
+using qpid::management::ManagementObject;
+using qpid::management::Args;
+using std::string;
+
+string Event/*MGEN:Event.NameCap*/::packageName = string ("/*MGEN:Event.NamePackageLower*/");
+string Event/*MGEN:Event.NameCap*/::eventName = string ("/*MGEN:Event.Name*/");
+uint8_t Event/*MGEN:Event.NameCap*/::md5Sum[16] =
+ {/*MGEN:Event.SchemaMD5*/};
+
+Event/*MGEN:Event.NameCap*/::Event/*MGEN:Event.NameCap*/ (/*MGEN:Event.ConstructorArgs*/) :
+ /*MGEN:Event.ConstructorInits*/
+{}
+
+namespace {
+ const string NAME("name");
+ const string TYPE("type");
+ const string DESC("desc");
+ const string ARGCOUNT("argCount");
+ const string ARGS("args");
+}
+
+void Event/*MGEN:Event.NameCap*/::registerSelf(ManagementAgent* agent)
+{
+ agent->registerEvent(packageName, eventName, md5Sum, writeSchema);
+}
+
+void Event/*MGEN:Event.NameCap*/::writeSchema (Buffer& buf)
+{
+ FieldTable ft;
+
+ // Schema class header:
+ buf.putOctet (CLASS_KIND_EVENT);
+ buf.putShortString (packageName); // Package Name
+ buf.putShortString (eventName); // Event Name
+ buf.putBin128 (md5Sum); // Schema Hash
+ buf.putShort (/*MGEN:Event.ArgCount*/); // Argument Count
+
+ // Arguments
+/*MGEN:Event.ArgSchema*/
+}
+
+void Event/*MGEN:Event.NameCap*/::encode(::qpid::framing::Buffer& buf) const
+{
+/*MGEN:Event.ArgEncodes*/
+}
diff --git a/cpp/managementgen/qmf/templates/Event.h b/cpp/managementgen/qmf/templates/Event.h
new file mode 100644
index 0000000000..a943c0c501
--- /dev/null
+++ b/cpp/managementgen/qmf/templates/Event.h
@@ -0,0 +1,58 @@
+/*MGEN:commentPrefix=//*/
+#ifndef _MANAGEMENT_/*MGEN:Event.NameUpper*/_
+#define _MANAGEMENT_/*MGEN:Event.NameUpper*/_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+/*MGEN:Root.Disclaimer*/
+
+#include "qpid/management/ManagementEvent.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Uuid.h"
+
+namespace qmf {
+/*MGEN:Event.OpenNamespaces*/
+
+class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
+{
+ private:
+ static void writeSchema (::qpid::framing::Buffer& buf);
+ static std::string packageName;
+ static std::string eventName;
+ static uint8_t md5Sum[16];
+
+/*MGEN:Event.ArgDeclarations*/
+
+ public:
+ writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
+
+ Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
+ ~Event/*MGEN:Class.NameCap*/() {};
+
+ static void registerSelf(::qpid::management::ManagementAgent* agent);
+ std::string& getPackageName() const { return packageName; }
+ std::string& getEventName() const { return eventName; }
+ uint8_t* getMd5Sum() const { return md5Sum; }
+ void encode(::qpid::framing::Buffer& buffer) const;
+};
+
+}/*MGEN:Event.CloseNamespaces*/
+
+#endif /*!_MANAGEMENT_/*MGEN:Event.NameUpper*/_*/
diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp
index 0a793c88e0..bc932d836c 100644
--- a/cpp/src/qpid/acl/Acl.cpp
+++ b/cpp/src/qpid/acl/Acl.cpp
@@ -25,6 +25,10 @@
#include "qpid/shared_ptr.h"
#include "qpid/log/Logger.h"
#include "qmf/org/apache/qpid/acl/Package.h"
+#include "qmf/org/apache/qpid/acl/EventAllow.h"
+#include "qmf/org/apache/qpid/acl/EventDeny.h"
+#include "qmf/org/apache/qpid/acl/EventFileLoaded.h"
+#include "qmf/org/apache/qpid/acl/EventFileLoadFailed.h"
#include <map>
@@ -41,7 +45,7 @@ namespace _qmf = qmf::org::apache::qpid::acl;
Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transferAcl(false)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
_qmf::Package packageInit(agent);
@@ -86,7 +90,11 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer
switch (aclreslt)
{
case ALLOWLOG:
- QPID_LOG(info, "ACL Allow id:" << id <<" action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name );
+ QPID_LOG(info, "ACL Allow id:" << id <<" action:" << AclHelper::getActionStr(action) <<
+ " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name );
+ agent->raiseEvent(_qmf::EventAllow(id, AclHelper::getActionStr(action),
+ AclHelper::getObjectTypeStr(objType),
+ name, framing::FieldTable()));
case ALLOW:
return true;
case DENY:
@@ -94,13 +102,12 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer
return false;
case DENYLOG:
if (mgmtObject!=0) mgmtObject->inc_aclDenyCount();
- default:
- QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name);
- if (mgmtObject!=0){
- framing::FieldTable _params;
- mgmtObject->event_aclEvent(1, id, AclHelper::getActionStr(action),AclHelper::getObjectTypeStr(objType),name, _params);
- }
- return false;
+ default:
+ QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name);
+ agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
+ AclHelper::getObjectTypeStr(objType),
+ name, framing::FieldTable()));
+ return false;
}
return false;
}
@@ -115,7 +122,7 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer
boost::shared_ptr<AclData> d(new AclData);
AclReader ar;
if (ar.read(aclFile, d)){
- mgmtObject->event_fileNotLoaded("","See log for file load reason failure");
+ agent->raiseEvent(_qmf::EventFileLoadFailed("", "See log for file load reason failure"));
return false;
}
@@ -127,7 +134,7 @@ Acl::Acl (AclValues& av, broker::Broker& b): aclValues(av), broker(&b), transfer
sys::AbsTime now = sys::AbsTime::now();
int64_t ns = sys::Duration(now);
mgmtObject->set_lastAclLoad(ns);
- mgmtObject->event_fileLoaded("");
+ agent->raiseEvent(_qmf::EventFileLoaded(""));
}
return true;
}
diff --git a/cpp/src/qpid/acl/Acl.h b/cpp/src/qpid/acl/Acl.h
index fe1c1500bb..8a3825f683 100644
--- a/cpp/src/qpid/acl/Acl.h
+++ b/cpp/src/qpid/acl/Acl.h
@@ -27,6 +27,7 @@
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/management/Manageable.h"
+#include "qpid/agent/ManagementAgent.h"
#include "qmf/org/apache/qpid/acl/Acl.h"
#include <map>
@@ -57,7 +58,7 @@ private:
bool transferAcl;
boost::shared_ptr<AclData> data;
qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle
-
+ qpid::management::ManagementAgent* agent;
public:
Acl (AclValues& av, broker::Broker& b);
diff --git a/cpp/src/qpid/acl/management-schema.xml b/cpp/src/qpid/acl/management-schema.xml
index 7d20353755..f362561356 100644
--- a/cpp/src/qpid/acl/management-schema.xml
+++ b/cpp/src/qpid/acl/management-schema.xml
@@ -17,32 +17,28 @@
-->
<class name="acl">
- <property name="brokerRef" type="objId" references="qpid.Broker" access="RO" index="y" parentRef="y"/>
- <property name="policyFile" type="sstr" access="RO" desc="Name of the policy file"/>
- <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
- <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
- <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
- <statistic name="aclDenyCount" type="count64" unit="record" desc="Number of ACL requests denied"/>
+ <property name="brokerRef" type="objId" references="org.apache.qpid.broker:Broker" access="RO" index="y" parentRef="y"/>
+ <property name="policyFile" type="sstr" access="RO" desc="Name of the policy file"/>
+ <property name="enforcingAcl" type="bool" access="RO" desc="Currently Enforcing ACL"/>
+ <property name="transferAcl" type="bool" access="RO" desc="Any transfer ACL rules in force"/>
+ <property name="lastAclLoad" type="absTime" access="RO" desc="Timestamp of last successful load of ACL"/>
+ <statistic name="aclDenyCount" type="count64" unit="request" desc="Number of ACL requests denied"/>
<method name="reloadACLFile" desc="Reload the ACL file"/>
+ </class>
- <event name="aclEvent" defaultSeverity="info" desc="Event generated by the ACL policy">
- <arg name="denied" type="bool"/>
- <arg name="authId" type="sstr"/>
- <arg name="action" type="sstr"/>
- <arg name="objType" type="sstr"/>
- <arg name="name" type="sstr"/>
- <arg name="params" type="map"/>
- </event>
-
- <event name="fileLoaded" defaultSeverity="warning" desc="ACL file successfully loaded - New policy in effect">
- <arg name="authId" type="sstr" desc="Name of user who initiated the file load"/>
- </event>
+ <eventArguments>
+ <arg name="action" type="sstr"/>
+ <arg name="arguments" type="map"/>
+ <arg name="objectName" type="sstr"/>
+ <arg name="objectType" type="sstr"/>
+ <arg name="reason" type="sstr"/>
+ <arg name="userId" type="sstr"/>
+ </eventArguments>
- <event name="fileNotLoaded" defaultSeverity="error" desc="Replacement ACL file could not be loaded">
- <arg name="authId" type="sstr" desc="Name of user who initiated the file load"/>
- <arg name="reason" type="sstr" desc="Reason for failure"/>
- </event>
- </class>
+ <event name="allow" args="userId, action, objectType, objectName, arguments"/>
+ <event name="deny" args="userId, action, objectType, objectName, arguments"/>
+ <event name="fileLoaded" args="userId"/>
+ <event name="fileLoadFailed" args="userId, reason"/>
</schema>
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h
index 1c219f7463..6af9abc26b 100644
--- a/cpp/src/qpid/agent/ManagementAgent.h
+++ b/cpp/src/qpid/agent/ManagementAgent.h
@@ -21,6 +21,7 @@
//
#include "qpid/management/ManagementObject.h"
+#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/Mutex.h"
@@ -43,8 +44,8 @@ class ManagementAgent
static ManagementAgent* agent;
};
- ManagementAgent () {}
- virtual ~ManagementAgent () {}
+ ManagementAgent() {}
+ virtual ~ManagementAgent() {}
virtual int getMaxThreads() = 0;
@@ -78,10 +79,16 @@ class ManagementAgent
// package initializer generated by the management code generator.
//
virtual void
- RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall) = 0;
+ registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall) = 0;
+
+ virtual void
+ registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
+ management::ManagementEvent::writeSchemaCall_t schemaCall) = 0;
// Add a management object to the agent. Once added, this object shall be visible
// in the greater management context.
@@ -97,8 +104,11 @@ class ManagementAgent
// pointer. This allows the management agent to report the deletion of the object
// in an orderly way.
//
- virtual ObjectId addObject (ManagementObject* objectPtr,
- uint64_t persistId = 0) = 0;
+ virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0;
+
+ //
+ //
+ virtual void raiseEvent(const ManagementEvent& event) = 0;
// If "useExternalThread" was set to true in init, this method must
// be called to provide a thread for any pending method calls that have arrived.
@@ -113,7 +123,7 @@ class ManagementAgent
// to pollCallbacks are necessary to clear the backlog. If callLimit is zero,
// the return value will also be zero.
//
- virtual uint32_t pollCallbacks (uint32_t callLimit = 0) = 0;
+ virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0;
// If "useExternalThread" was set to true in the constructor, this method provides
// a standard file descriptor that can be used in a select statement to signal that
@@ -121,14 +131,7 @@ class ManagementAgent
// least one method call). When this fd is ready-for-read, pollCallbacks may be
// invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd.
//
- virtual int getSignalFd (void) = 0;
-
-protected:
- friend class ManagementObject;
- virtual sys::Mutex& getMutex() = 0;
- virtual framing::Buffer* startEventLH() = 0;
- virtual void finishEventLH(framing::Buffer* buf) = 0;
-
+ virtual int getSignalFd() = 0;
};
}}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 6a1542b1f2..b178d0fc28 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -75,12 +75,13 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
return agent;
}
-const string ManagementAgentImpl::storeMagicNumber("MA01");
+const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
extThread(false), writeFd(-1), readFd(-1),
- clientWasAdded(true), requestedBank(0),
- assignedBank(0), brokerBank(0), bootSequence(0),
+ connected(false), lastFailure("never connected"),
+ clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
pubThreadBody(*this), pubThread(pubThreadBody)
{
@@ -122,18 +123,24 @@ void ManagementAgentImpl::init(string brokerHost,
storeData(true);
}
-ManagementAgentImpl::~ManagementAgentImpl()
-{
+void ManagementAgentImpl::registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementAgentImpl::RegisterClass(std::string packageName,
- std::string className,
- uint8_t* md5Sum,
+void ManagementAgentImpl::registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = FindOrAddPackage(packageName);
- AddClassLocal(pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
@@ -151,6 +158,23 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
return objectId;
}
+void ManagementAgentImpl::raiseEvent(const ManagementEvent& event)
+{
+ Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.event");
+}
+
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
Mutex::ScopedLock lock(agentLock);
@@ -184,10 +208,12 @@ void ManagementAgentImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- EncodeHeader(buffer, 'A');
+ connected = true;
+ encodeHeader(buffer, 'A');
buffer.putShortString("RemoteAgent [C++]");
systemId.encode (buffer);
- buffer.putLong(requestedBank);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
uint32_t length = 512 - buffer.available();
buffer.reset();
connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
@@ -197,10 +223,12 @@ void ManagementAgentImpl::storeData(bool requested)
{
if (!storeFile.empty()) {
ofstream outFile(storeFile.c_str());
- uint32_t bankToWrite = requested ? requestedBank : assignedBank;
+ uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank;
+ uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank;
if (outFile.good()) {
- outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl;
+ outFile << storeMagicNumber << " " << brokerBankToWrite << " " <<
+ agentBankToWrite << " " << bootSequence << endl;
outFile.close();
}
}
@@ -215,7 +243,8 @@ void ManagementAgentImpl::retrieveData()
if (inFile.good()) {
inFile >> mn;
if (mn == storeMagicNumber) {
- inFile >> requestedBank;
+ inFile >> requestedBrokerBank;
+ inFile >> requestedAgentBank;
inFile >> bootSequence;
}
inFile.close();
@@ -229,7 +258,7 @@ void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequen
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'z', sequence);
+ encodeHeader(outBuffer, 'z', sequence);
outBuffer.putLong(code);
outBuffer.putShortString(text);
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -241,20 +270,23 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
- brokerBank = inBuffer.getLong();
- assignedBank = inBuffer.getLong();
- if (assignedBank != requestedBank) {
- if (requestedBank == 0)
- cout << "Initial object-id bank assigned: " << assignedBank << endl;
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0)
+ cout << "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank << endl;
else
- cout << "Collision in object-id! New bank assigned: " << assignedBank << endl;
+ cout << "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank << endl;
storeData();
}
- attachment.setBanks(brokerBank, assignedBank);
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
// Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBank);
+ connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
@@ -263,8 +295,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'p');
- EncodePackageIndication(outBuffer, pIter);
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -273,8 +305,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
ClassMap cMap = pIter->second;
for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
outBuffer.reset();
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -294,14 +326,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
- ClassMap cMap = pIter->second;
+ ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- SchemaClass schema = cIter->second;
+ SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 's', sequence);
+ encodeHeader(outBuffer, 's', sequence);
schema.writeSchemaCall(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
@@ -331,7 +363,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
@@ -344,7 +376,14 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
}
else
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(std::exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putShortString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -379,7 +418,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'g', sequence);
+ encodeHeader(outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -419,7 +458,7 @@ void ManagementAgentImpl::received(Message& msg)
replyToKey = rt.getRoutingKey();
}
- if (CheckHeader(inBuffer, &opcode, &sequence))
+ if (checkHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
@@ -429,16 +468,16 @@ void ManagementAgentImpl::received(Message& msg)
}
}
-void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet('A');
buf.putOctet('M');
- buf.putOctet('1');
+ buf.putOctet('2');
buf.putOctet(opcode);
buf.putLong (seq);
}
-bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
return false;
@@ -450,10 +489,10 @@ bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *se
*opcode = buf.getOctet();
*seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name)
{
PackageMap::iterator pIter = packages.find(name);
if (pIter != packages.end())
@@ -467,8 +506,8 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'p');
- EncodePackageIndication(outBuffer, result.first);
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package");
@@ -486,8 +525,9 @@ void ManagementAgentImpl::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
- string className,
+void ManagementAgentImpl::addClassLocal(uint8_t classKind,
+ PackageMap::iterator pIter,
+ const string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -502,32 +542,28 @@ void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
return;
// No such class found, create a new class with local information.
- SchemaClass classInfo;
-
- classInfo.writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cMap.insert(std::pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
}
-void ManagementAgentImpl::EncodePackageIndication(Buffer& buf,
+void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
PackageMap::iterator pIter)
{
buf.putShortString((*pIter).first);
}
-void ManagementAgentImpl::EncodeClassIndication(Buffer& buf,
+void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
+ buf.putOctet((*cIter).second.kind);
buf.putShortString((*pIter).first);
buf.putShortString(key.name);
- buf.putBin128 (key.hash);
+ buf.putBin128(key.hash);
}
-void ManagementAgentImpl::PeriodicProcessing()
+void ManagementAgentImpl::periodicProcessing()
{
#define BUFSIZE 65536
Mutex::ScopedLock lock(agentLock);
@@ -536,9 +572,12 @@ void ManagementAgentImpl::PeriodicProcessing()
string routingKey;
std::list<ObjectId> deleteList;
+ if (!connected)
+ return;
+
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
+ encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
contentSize = BUFSIZE - msgBuffer.available();
@@ -573,7 +612,7 @@ void ManagementAgentImpl::PeriodicProcessing()
if (object->getConfigChanged() || object->isDeleted())
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'c');
+ encodeHeader(msgBuffer, 'c');
object->writeProperties(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available();
@@ -585,7 +624,7 @@ void ManagementAgentImpl::PeriodicProcessing()
if (object->getInstChanged())
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'i');
+ encodeHeader(msgBuffer, 'i');
object->writeStatistics(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available();
@@ -664,8 +703,8 @@ ManagementAgentImpl::ConnectionThread::~ConnectionThread()
void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
uint32_t length,
- string exchange,
- string routingKey)
+ const string& exchange,
+ const string& routingKey)
{
{
Mutex::ScopedLock _lock(connLock);
@@ -683,10 +722,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
session.messageTransfer(arg::content=msg, arg::destination=exchange);
}
-void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
{
stringstream key;
- key << "agent." << agentBank;
+ key << "agent." << brokerBank << "." << agentBank;
session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
arg::bindingKey=key.str());
}
@@ -695,28 +734,7 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
void ManagementAgentImpl::PublishThread::run()
{
while (true) {
- ::sleep(5);
- agent.PeriodicProcessing();
+ ::sleep(agent.getInterval());
+ agent.periodicProcessing();
}
}
-
-Mutex& ManagementAgentImpl::getMutex()
-{
- return agentLock;
-}
-
-Buffer* ManagementAgentImpl::startEventLH()
-{
- Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
- EncodeHeader(*outBuffer, 'e');
- outBuffer->putLongLong(uint64_t(Duration(now())));
- return outBuffer;
-}
-
-void ManagementAgentImpl::finishEventLH(Buffer* outBuffer)
-{
- uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
- outBuffer->reset();
- connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event");
- delete outBuffer;
-}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 7d9be6daf9..a964694690 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -43,24 +43,34 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
public:
ManagementAgentImpl();
- virtual ~ManagementAgentImpl();
+ virtual ~ManagementAgentImpl() {};
+ //
+ // Methods from ManagementAgent
+ //
int getMaxThreads() { return 1; }
void init(std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
std::string storeFile = "");
- void RegisterClass(std::string packageName,
- std::string className,
- uint8_t* md5Sum,
+ bool isConnected() { return connected; }
+ std::string& getLastFailure() { return lastFailure; }
+ void registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject (management::ManagementObject* objectPtr,
- uint64_t persistId = 0);
- uint32_t pollCallbacks (uint32_t callLimit = 0);
- int getSignalFd (void);
+ void registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+ void raiseEvent(const management::ManagementEvent& event);
+ uint32_t pollCallbacks(uint32_t callLimit = 0);
+ int getSignalFd();
- void PeriodicProcessing();
+ uint16_t getInterval() { return interval; }
+ void periodicProcessing();
private:
@@ -84,8 +94,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
struct SchemaClass {
management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+ uint8_t kind;
- SchemaClass () : writeSchemaCall(0) {}
+ SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
+ const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
};
struct QueuedMethod {
@@ -120,12 +132,16 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
framing::Uuid systemId;
std::string host;
uint16_t port;
+ bool connected;
+ std::string lastFailure;
+
+ bool clientWasAdded;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ uint16_t bootSequence;
- bool clientWasAdded;
- uint32_t requestedBank;
- uint32_t assignedBank;
- uint32_t brokerBank;
- uint16_t bootSequence;
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
@@ -148,9 +164,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
uint32_t length,
- std::string exchange,
- std::string routingKey);
- void bindToBank(uint32_t agentBank);
+ const std::string& exchange,
+ const std::string& routingKey);
+ void bindToBank(uint32_t brokerBank, uint32_t agentBank);
};
class PublishThread : public sys::Runnable
@@ -171,19 +187,20 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void startProtocol();
void storeData(bool requested=false);
void retrieveData();
- PackageMap::iterator FindOrAddPackage (std::string name);
+ PackageMap::iterator findOrAddPackage(const std::string& name);
void moveNewObjectsLH();
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
+ void addClassLocal (uint8_t classKind,
+ PackageMap::iterator pIter,
+ const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (qpid::framing::Buffer& buf,
+ void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
- void EncodeClassIndication (qpid::framing::Buffer& buf,
+ void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
@@ -194,9 +211,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* outBuffer);
};
}}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 38affad51f..3ed41a3f2d 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -72,6 +72,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
if (agent != 0)
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
agent->addObject(mgmtObject);
+ ConnectionState::setUrl(mgmtId);
}
}
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index aac31bbf96..c04bd46f72 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -63,6 +63,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
virtual void setUserId(const string& uid) { userId = uid; }
const string& getUserId() const { return userId; }
+ void setUrl(const string& _url) { url = _url; }
+ const string& getUrl() const { return url; }
+
void setFederationLink(bool b) { federationLink = b; }
bool isFederationLink() const { return federationLink; }
@@ -85,6 +88,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
uint16_t heartbeat;
uint64_t stagingThreshold;
string userId;
+ string url;
bool federationLink;
};
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 64b0b80446..9c23abeba9 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -26,6 +26,15 @@
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/agent/ManagementAgent.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventBind.h"
+#include "qmf/org/apache/qpid/broker/EventUnbind.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -36,6 +45,8 @@ namespace broker {
using namespace qpid;
using namespace qpid::framing;
using namespace qpid::framing::dtx;
+using namespace qpid::management;
+namespace _qmf = qmf::org::apache::qpid::broker;
typedef std::vector<Queue::shared_ptr> QueueVector;
@@ -54,18 +65,17 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
const string& alternateExchange,
bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::TYPE, type));
- params.insert(make_pair(acl::ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") ));
- params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false")));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,&params) )
- throw NotAllowedException("ACL denied exhange declare request");
- }
-
+ params.insert(make_pair(acl::TYPE, type));
+ params.insert(make_pair(acl::ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") ));
+ params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false")));
+ if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchange,&params) )
+ throw NotAllowedException("ACL denied exhange declare request");
+ }
+
//TODO: implement autoDelete
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
@@ -90,6 +100,13 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
checkType(response.first, type);
checkAlternate(response.first, alternate);
}
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
+ alternateExchange, durable, false, args,
+ response.second ? "created" : "existing"));
+
}catch(UnknownExchangeTypeException& e){
throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
@@ -106,38 +123,37 @@ void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchang
void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
if (alternate && alternate != exchange->getAlternate())
- throw NotAllowedException(
- QPID_MSG("Exchange declared with alternate-exchange "
- << exchange->getAlternate()->getName() << ", requested "
- << alternate->getName()));
+ throw NotAllowedException(QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
}
-void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange delete request");
+void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::EXCHANGE,name,NULL) )
+ throw NotAllowedException("ACL denied exhange delete request");
}
-
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
-}
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name));
+}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
{
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) )
- throw NotAllowedException("ACL denied exhange query request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::EXCHANGE,name,NULL) )
+ throw NotAllowedException("ACL denied exhange query request");
}
try {
@@ -147,15 +163,15 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
return ExchangeQueryResult("", false, true, FieldTable());
}
}
+
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const string& exchangeName, const string& routingKey,
- const FieldTable& arguments){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) )
- throw NotAllowedException("ACL denied exhange bind request");
+ const FieldTable& arguments)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::BIND,acl::EXCHANGE,exchangeName,routingKey) )
+ throw NotAllowedException("ACL denied exhange bind request");
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -167,30 +183,29 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
if (exchange->isDurable() && queue->isDurable()) {
getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
}
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
}
}else{
- throw NotFoundException(
- "Bind failed. No such exchange: " + exchangeName);
+ throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
}
}
-void
-SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
- const string& exchangeName,
- const string& routingKey)
+void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
+ const string& exchangeName,
+ const string& routingKey)
{
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::QUEUENAME, queueName));
- params.insert(make_pair(acl::ROUTINGKEY, routingKey));
- if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exchange unbind request");
+ params.insert(make_pair(acl::QUEUENAME, queueName));
+ params.insert(make_pair(acl::ROUTINGKEY, routingKey));
+ if (!acl->authorise(getConnection().getUserId(),acl::UNBIND,acl::EXCHANGE,exchangeName,&params) )
+ throw NotAllowedException("ACL denied exchange unbind request");
}
-
Queue::shared_ptr queue = getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
@@ -198,10 +213,14 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
//TODO: revise unbind to rely solely on binding key (not args)
- if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
- }
+ if (exchange->unbind(queue, routingKey, 0)) {
+ if (exchange->isDurable() && queue->isDurable())
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey));
+ }
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -209,16 +228,15 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
const std::string& key,
const framing::FieldTable& args)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::QUEUENAME, queueName));
- params.insert(make_pair(acl::ROUTINGKEY, key));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,&params) )
- throw NotAllowedException("ACL denied exhange bound request");
+ params.insert(make_pair(acl::QUEUENAME, queueName));
+ params.insert(make_pair(acl::ROUTINGKEY, key));
+ if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::EXCHANGE,exchangeName,&params) )
+ throw NotAllowedException("ACL denied exhange bound request");
}
-
+
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
@@ -279,13 +297,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) )
- throw NotAllowedException("ACL denied queue query request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
+ if (!acl->authorise(getConnection().getUserId(),acl::ACCESS,acl::QUEUE,name,NULL) )
+ throw NotAllowedException("ACL denied queue query request");
}
-
+
Queue::shared_ptr queue = session.getBroker().getQueues().find(name);
if (queue) {
@@ -305,20 +322,19 @@ QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
}
void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, const qpid::framing::FieldTable& arguments){
-
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, const qpid::framing::FieldTable& arguments)
+{
+ AclModule* acl = getBroker().getAcl();
+ if (acl) {
std::map<acl::Property, std::string> params;
- params.insert(make_pair(acl::ALTERNATE, alternateExchange));
- params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") ));
- params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false")));
- params.insert(make_pair(acl::EXCLUSIVE, std::string(exclusive ? "true" : "false")));
- params.insert(make_pair(acl::AUTODELETE, std::string(autoDelete ? "true" : "false")));
- if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,&params) )
- throw NotAllowedException("ACL denied queue create request");
+ params.insert(make_pair(acl::ALTERNATE, alternateExchange));
+ params.insert(make_pair(acl::PASSIVE, std::string(passive ? "true" : "false") ));
+ params.insert(make_pair(acl::DURABLE, std::string(durable ? "true" : "false")));
+ params.insert(make_pair(acl::EXCLUSIVE, std::string(exclusive ? "true" : "false")));
+ params.insert(make_pair(acl::AUTODELETE, std::string(autoDelete ? "true" : "false")));
+ if (!acl->authorise(getConnection().getUserId(),acl::CREATE,acl::QUEUE,name,&params) )
+ throw NotAllowedException("ACL denied queue create request");
}
Exchange::shared_ptr alternate;
@@ -327,17 +343,16 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getQueue(name);
+ queue = getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(
- name, durable,
- autoDelete,
- exclusive ? this : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().getQueues().declare(name, durable,
+ autoDelete,
+ exclusive ? this : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -346,44 +361,50 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
//apply settings & create persistent record if required
queue_created.first->create(arguments);
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ //add default binding:
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
//handle automatic cleanup:
- if (exclusive) {
- exclusiveQueues.push_back(queue);
- }
- } else {
+ if (exclusive) {
+ exclusiveQueues.push_back(queue);
+ }
+ } else {
if (exclusive && queue->setExclusiveOwner(this)) {
- exclusiveQueues.push_back(queue);
+ exclusiveQueues.push_back(queue);
}
}
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
+ name, durable, exclusive, autoDelete, arguments,
+ queue_created.second ? "created" : "existing"));
}
+
if (exclusive && !queue->isExclusiveOwner(this))
- throw ResourceLockedException(
- QPID_MSG("Cannot grant exclusive access to queue "
- << queue->getName()));
+ throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
}
void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue purge request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::PURGE,acl::QUEUE,queue,NULL) )
+ throw NotAllowedException("ACL denied queue purge request");
}
getQueue(queue)->purge();
}
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) )
- throw NotAllowedException("ACL denied queue delete request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ if (!acl->authorise(getConnection().getUserId(),acl::DELETE,acl::QUEUE,queue,NULL) )
+ throw NotAllowedException("ACL denied queue delete request");
}
ChannelException error(0, "");
@@ -401,6 +422,10 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
q->destroy();
getBroker().getQueues().destroy(queue);
q->unbind(getBroker().getExchanges(), q);
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue));
}
}
@@ -441,12 +466,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
const FieldTable& arguments)
{
- AclModule* acl = getBroker().getAcl();
- if (acl)
- {
- // add flags as needed
- if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) )
- throw NotAllowedException("ACL denied Queue subscribe request");
+ AclModule* acl = getBroker().getAcl();
+ if (acl)
+ {
+ // add flags as needed
+ if (!acl->authorise(getConnection().getUserId(),acl::CONSUME,acl::QUEUE,queueName,NULL) )
+ throw NotAllowedException("ACL denied Queue subscribe request");
}
Queue::shared_ptr queue = getQueue(queueName);
@@ -457,12 +482,21 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode),
tag, queue, false, //TODO get rid of no-local
acceptMode == 0, acquireMode == 0, exclusive, &arguments);
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
+ queueName, destination, exclusive, arguments));
}
void
SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
{
state.cancel(destination);
+
+ ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
+ if (agent)
+ agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
}
void
diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h
index 4f482dc206..828e6e963a 100644
--- a/cpp/src/qpid/framing/Buffer.h
+++ b/cpp/src/qpid/framing/Buffer.h
@@ -28,7 +28,9 @@
namespace qpid {
namespace framing {
-struct OutOfBounds : qpid::Exception {};
+struct OutOfBounds : qpid::Exception {
+ OutOfBounds() : qpid::Exception(std::string("Out of Bounds")) {}
+};
class Content;
class FieldTable;
diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h
index afa5c968f7..b4d80d8fad 100644
--- a/cpp/src/qpid/management/Manageable.h
+++ b/cpp/src/qpid/management/Manageable.h
@@ -45,6 +45,7 @@ class Manageable
static const status_t STATUS_INVALID_PARAMETER = 4;
static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
static const status_t STATUS_FORBIDDEN = 6;
+ static const status_t STATUS_EXCEPTION = 7;
static const status_t STATUS_USER = 0x00010000;
// Every "Manageable" object must hold a reference to exactly one
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 3ae98e8264..0e046bb813 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -179,14 +179,24 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang
dExchange = _dexchange;
}
-void ManagementBroker::RegisterClass (string packageName,
- string className,
+void ManagementBroker::registerClass (string& packageName,
+ string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
- AddClass(pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::registerEvent (string& packageName,
+ string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
ObjectId ManagementBroker::addObject (ManagementObject* object,
@@ -211,6 +221,23 @@ ObjectId ManagementBroker::addObject (ManagementObject* object,
return objId;
}
+void ManagementBroker::raiseEvent(const ManagementEvent& event)
+{
+ Mutex::ScopedLock lock (userLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt.event");
+}
+
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
: TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
@@ -219,7 +246,7 @@ ManagementBroker::Periodic::~Periodic () {}
void ManagementBroker::Periodic::fire ()
{
broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.PeriodicProcessing ();
+ broker.periodicProcessing ();
}
void ManagementBroker::clientAdded (void)
@@ -233,35 +260,35 @@ void ManagementBroker::clientAdded (void)
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'x');
+ encodeHeader (outBuffer, 'x');
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
}
}
-void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
- buf.putOctet ('1');
+ buf.putOctet ('2');
buf.putOctet (opcode);
buf.putLong (seq);
}
-bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::SendBuffer (Buffer& buf,
+void ManagementBroker::sendBuffer (Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
@@ -304,7 +331,7 @@ void ManagementBroker::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementBroker::PeriodicProcessing (void)
+void ManagementBroker::periodicProcessing (void)
{
#define BUFSIZE 65536
Mutex::ScopedLock lock (userLock);
@@ -315,13 +342,13 @@ void ManagementBroker::PeriodicProcessing (void)
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
+ encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
moveNewObjectsLH();
@@ -350,25 +377,25 @@ void ManagementBroker::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
+ encodeHeader (msgBuffer, 'c');
object->writeProperties(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
+ encodeHeader (msgBuffer, 'i');
object->writeStatistics(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->isDeleted ())
@@ -393,12 +420,12 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'z', sequence);
+ encodeHeader (outBuffer, 'z', sequence);
outBuffer.putLong (code);
outBuffer.putShortString (text);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
@@ -411,7 +438,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.0.#
+ // agent.1.0.#
// broker
if (routingKey == "broker") {
@@ -419,12 +446,12 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
return false;
}
- else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
dispatchAgentCommandLH(msg);
return false;
}
- else if (routingKey.compare(0, 6, "agent.") == 0) {
+ else if (routingKey.compare(0, 8, "agent.1.") == 0) {
return authorizeAgentMessageLH(msg);
}
@@ -447,7 +474,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
if (acl != 0) {
string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
@@ -460,7 +487,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
return;
}
}
@@ -476,12 +503,19 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
}
else
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(std::exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putShortString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -489,12 +523,12 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'b', sequence);
+ encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -506,11 +540,11 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
sendCommandComplete (replyToKey, sequence);
@@ -521,7 +555,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
std::string packageName;
inBuffer.getShortString(packageName);
- FindOrAddPackageLH(packageName);
+ findOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -542,11 +576,11 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q', sequence);
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
@@ -558,26 +592,27 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
std::string packageName;
SchemaClassKey key;
+ uint8_t kind = inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
ClassMap::iterator cIter = pIter->second.find(key);
if (cIter == pIter->second.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
uint32_t sequence = nextRequestSequence++;
- EncodeHeader (outBuffer, 'S', sequence);
+ encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
outBuffer.putShortString(key.name);
outBuffer.putBin128(key.hash);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
}
}
@@ -612,11 +647,11 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey
SchemaClass& classInfo = cIter->second;
if (classInfo.hasSchema()) {
- EncodeHeader(outBuffer, 's', sequence);
+ encodeHeader(outBuffer, 's', sequence);
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -634,9 +669,10 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
SchemaClassKey key;
inBuffer.record();
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
+ inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
inBuffer.restore();
PackageMap::iterator pIter = packages.find(packageName);
@@ -644,7 +680,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
- size_t length = ValidateSchema(inBuffer);
+ size_t length = validateSchema(inBuffer, cIter->second.kind);
if (length == 0) {
QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
@@ -658,11 +694,11 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -727,7 +763,7 @@ void ManagementBroker::deleteOrphanedAgentsLH()
void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
- uint32_t requestedBank;
+ uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
@@ -737,14 +773,15 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
- inBuffer.getShortString (label);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
+ inBuffer.getShortString(label);
+ systemId.decode(inBuffer);
+ requestedBrokerBank = inBuffer.getLong();
+ requestedAgentBank = inBuffer.getLong();
+ assignedBank = assignBankLH(requestedAgentBank);
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
@@ -755,7 +792,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
- agent->mgmtObject->set_objectIdBank (assignedBank);
+ agent->mgmtObject->set_brokerBank (brokerBank);
+ agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[connectionRef] = agent;
@@ -764,12 +802,12 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'a', sequence);
+ encodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -799,12 +837,12 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'g', sequence);
+ encodeHeader (outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
}
@@ -824,7 +862,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return false;
if (opcode == 'M') {
@@ -861,12 +899,12 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
return false;
@@ -900,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -915,7 +953,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -930,19 +968,20 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std:
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
+ encodeHeader (outBuffer, 'p');
+ encodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+ sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
return result.first;
}
-void ManagementBroker::AddClass(PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -958,71 +997,76 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter,
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
cIter = cMap.find(key);
}
-void ManagementBroker::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementBroker::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
}
-void ManagementBroker::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementBroker::encodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putOctet((*cIter).second.kind);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128(key.hash);
+}
+
+size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+{
+ if (kind == ManagementItem::CLASS_KIND_TABLE)
+ return validateTableSchema(inBuffer);
+ else if (kind == ManagementItem::CLASS_KIND_EVENT)
+ return validateEventSchema(inBuffer);
+ return 0;
}
-size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
string text;
uint8_t hash[16];
- inBuffer.record();
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_TABLE)
+ return 0;
- uint16_t propCount = inBuffer.getShort();
- uint16_t statCount = inBuffer.getShort();
- uint16_t methCount = inBuffer.getShort();
- uint16_t evntCount = inBuffer.getShort();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
- for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
- for (uint16_t idx = 0; idx < methCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
}
- }
- for (uint16_t idx = 0; idx < evntCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
}
+ } catch (std::exception& e) {
+ return 0;
}
end = inBuffer.getPosition();
@@ -1030,24 +1074,34 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
return end - start;
}
-Mutex& ManagementBroker::getMutex()
+size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
{
- return userLock;
-}
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
-Buffer* ManagementBroker::startEventLH()
-{
- Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
- EncodeHeader(*outBuffer, 'e');
- outBuffer->putLongLong(uint64_t(Duration(now())));
- return outBuffer;
-}
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_EVENT)
+ return 0;
-void ManagementBroker::finishEventLH(Buffer* outBuffer)
-{
- uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
- outBuffer->reset();
- SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
- delete outBuffer;
-}
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t argCount = inBuffer.getShort();
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+ } catch (std::exception& e) {
+ return 0;
+ }
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 23fba74b83..c0e0c50963 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -38,11 +38,11 @@ namespace management {
class ManagementBroker : public ManagementAgent
{
- private:
+private:
int threadPoolSize;
- public:
+public:
ManagementBroker ();
virtual ~ManagementBroker ();
@@ -52,13 +52,18 @@ class ManagementBroker : public ManagementAgent
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
- void RegisterClass (std::string packageName,
- std::string className,
+ void registerClass (std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void registerEvent (std::string& packageName,
+ std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
- void clientAdded (void);
+ void raiseEvent(const ManagementEvent& event);
+ void clientAdded ();
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
@@ -68,7 +73,7 @@ class ManagementBroker : public ManagementAgent
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
- private:
+private:
friend class ManagementAgent;
struct Periodic : public qpid::broker::TimerTask
@@ -127,15 +132,16 @@ class ManagementBroker : public ManagementAgent
struct SchemaClass
{
+ uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
uint32_t pendingSequence;
size_t bufferLen;
uint8_t* buffer;
- SchemaClass(uint32_t seq) :
- writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
- SchemaClass(ManagementObject::writeSchemaCall_t call) :
- writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
+ SchemaClass(uint8_t _kind, uint32_t seq) :
+ kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+ SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
+ kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
@@ -154,12 +160,12 @@ class ManagementBroker : public ManagementAgent
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
- qpid::broker::Timer timer;
+ qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
- qpid::broker::Broker* broker;
+ qpid::broker::Broker* broker;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
@@ -173,10 +179,10 @@ class ManagementBroker : public ManagementAgent
char eventBuffer[MA_BUFFER_SIZE];
void writeData ();
- void PeriodicProcessing (void);
- void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (framing::Buffer& buf,
+ void periodicProcessing (void);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void sendBuffer (framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
@@ -185,14 +191,15 @@ class ManagementBroker : public ManagementAgent
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
void dispatchAgentCommandLH(qpid::broker::Message& msg);
- PackageMap::iterator FindOrAddPackageLH(std::string name);
- void AddClass(PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator findOrAddPackageLH(std::string name);
+ void addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
- void EncodeClassIndication (framing::Buffer& buf,
+ void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
bool bankInUse (uint32_t bank);
@@ -212,10 +219,9 @@ class ManagementBroker : public ManagementAgent
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- size_t ValidateSchema(framing::Buffer&);
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* outBuffer);
+ size_t validateSchema(framing::Buffer&, uint8_t kind);
+ size_t validateTableSchema(framing::Buffer&);
+ size_t validateEventSchema(framing::Buffer&);
};
}}
diff --git a/cpp/src/qpid/management/ManagementEvent.h b/cpp/src/qpid/management/ManagementEvent.h
new file mode 100644
index 0000000000..d5a47e9144
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementEvent.h
@@ -0,0 +1,48 @@
+#ifndef _ManagementEvent_
+#define _ManagementEvent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include <qpid/framing/Buffer.h>
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgent;
+
+class ManagementEvent : public ManagementItem {
+public:
+ typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+ virtual ~ManagementEvent() {}
+
+ virtual writeSchemaCall_t getWriteSchemaCall(void) = 0;
+ virtual std::string& getEventName() const = 0;
+ virtual std::string& getPackageName() const = 0;
+ virtual uint8_t* getMd5Sum() const = 0;
+ virtual void encode(qpid::framing::Buffer&) const = 0;
+};
+
+}}
+
+#endif /*!_ManagementEvent_*/
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index e0386ee057..ce65ae3279 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -84,6 +84,21 @@ void ObjectId::decode(framing::Buffer& buffer)
second = buffer.getLongLong();
}
+namespace qpid {
+namespace management {
+
+std::ostream& operator<<(std::ostream& out, const ObjectId& i)
+{
+ out << "[" << ((i.first & 0xF000000000000000LL) >> 60) <<
+ "-" << ((i.first & 0x0FFF000000000000LL) >> 48) <<
+ "-" << ((i.first & 0x0000FFFFF0000000LL) >> 32) <<
+ "-" << (i.first & 0x000000000FFFFFFFLL) <<
+ "-" << i.second << "]";
+ return out;
+}
+
+}}
+
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (Buffer& buf)
@@ -109,18 +124,3 @@ int ManagementObject::getThreadIndex() {
}
return thisIndex;
}
-
-Mutex& ManagementObject::getMutex()
-{
- return agent->getMutex();
-}
-
-Buffer* ManagementObject::startEventLH()
-{
- return agent->startEventLH();
-}
-
-void ManagementObject::finishEventLH(Buffer* buf)
-{
- agent->finishEventLH(buf);
-}
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 1b809f5125..3778d66b7e 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -46,7 +46,7 @@ public:
class ObjectId {
-private:
+protected:
const AgentAttachment* agent;
uint64_t first;
uint64_t second;
@@ -59,23 +59,11 @@ public:
bool operator<(const ObjectId &other) const;
void encode(framing::Buffer& buffer);
void decode(framing::Buffer& buffer);
+ friend std::ostream& operator<<(std::ostream&, const ObjectId&);
};
-class ManagementObject
-{
- protected:
-
- uint64_t createTime;
- uint64_t destroyTime;
- ObjectId objectId;
- bool configChanged;
- bool instChanged;
- bool deleted;
- Manageable* coreObject;
- sys::Mutex accessLock;
- ManagementAgent* agent;
- int maxThreads;
-
+class ManagementItem {
+public:
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
static const uint8_t TYPE_U32 = 3;
@@ -107,15 +95,35 @@ class ManagementObject
static const uint8_t FLAG_INDEX = 0x02;
static const uint8_t FLAG_END = 0x80;
- static int nextThreadIndex;
+ const static uint8_t CLASS_KIND_TABLE = 1;
+ const static uint8_t CLASS_KIND_EVENT = 2;
+
+
+
+public:
+ virtual ~ManagementItem() {}
+};
+
+class ManagementObject : public ManagementItem
+{
+ protected:
+
+ uint64_t createTime;
+ uint64_t destroyTime;
+ ObjectId objectId;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+ sys::Mutex accessLock;
+ ManagementAgent* agent;
+ int maxThreads;
+
+ static int nextThreadIndex;
int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* buf);
-
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
@@ -129,14 +137,14 @@ class ManagementObject
virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
virtual void writeStatistics(qpid::framing::Buffer& buf,
bool skipHeaders = false) = 0;
- virtual void doMethod (std::string methodName,
+ virtual void doMethod (std::string& methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
virtual void setReference (ObjectId objectId);
- virtual std::string& getClassName (void) = 0;
- virtual std::string& getPackageName (void) = 0;
- virtual uint8_t* getMd5Sum (void) = 0;
+ virtual std::string& getClassName (void) const = 0;
+ virtual std::string& getPackageName (void) const = 0;
+ virtual uint8_t* getMd5Sum (void) const = 0;
void setObjectId (ObjectId oid) { objectId = oid; }
ObjectId getObjectId (void) { return objectId; }
diff --git a/java/client/src/main/java/org/apache/qpid/management/Names.java b/java/client/src/main/java/org/apache/qpid/management/Names.java
index 8ab9e13d4f..64e5c0fef2 100644
--- a/java/client/src/main/java/org/apache/qpid/management/Names.java
+++ b/java/client/src/main/java/org/apache/qpid/management/Names.java
@@ -35,7 +35,7 @@ public interface Names
String METHOD_REPLY_QUEUE_PREFIX = "reply.";
String AMQ_DIRECT_QUEUE = "amq.direct";
- String AGENT_ROUTING_KEY = "agent.0";
+ String AGENT_ROUTING_KEY = "agent.1.0";
String BROKER_ROUTING_KEY = "broker";
@@ -49,4 +49,4 @@ public interface Names
String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml";
String ARG_COUNT_PARAM_NAME = "argCount";
-} \ No newline at end of file
+}
diff --git a/java/client/src/main/java/org/apache/qpid/management/Protocol.java b/java/client/src/main/java/org/apache/qpid/management/Protocol.java
index f50a85f28a..185f417448 100644
--- a/java/client/src/main/java/org/apache/qpid/management/Protocol.java
+++ b/java/client/src/main/java/org/apache/qpid/management/Protocol.java
@@ -27,8 +27,8 @@ package org.apache.qpid.management;
*/
public interface Protocol
{
- String MAGIC_NUMBER = "AM1";
+ String MAGIC_NUMBER = "AM2";
byte [] METHOD_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"M").getBytes();
byte [] SCHEMA_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"S").getBytes();
-} \ No newline at end of file
+}
diff --git a/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java b/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java
index 7a0ee556d2..497e264581 100644
--- a/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java
@@ -49,6 +49,10 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler
{
try
{
+ int classKind = decoder.readUint8();
+ if (classKind != 1) {
+ return;
+ }
String packageName = decoder.readStr8();
String className = decoder.readStr8();
@@ -57,7 +61,7 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler
int howManyProperties = decoder.readUint16();
int howManyStatistics = decoder.readUint16();
int howManyMethods = decoder.readUint16();
- int howManyEvents = decoder.readUint16();
+ int howManyEvents = 0;
// FIXME : Divide between schema error and raw data conversion error!!!!
_domainModel.addSchema(
@@ -155,4 +159,4 @@ public class SchemaResponseMessageHandler extends BaseMessageHandler
}
return result;
}
- } \ No newline at end of file
+ }
diff --git a/python/commands/qpid-config b/python/commands/qpid-config
index 13b489abae..8b011778d6 100755
--- a/python/commands/qpid-config
+++ b/python/commands/qpid-config
@@ -84,8 +84,8 @@ class BrokerManager:
self.qmf.delBroker(self.broker)
def Overview (self):
- exchanges = self.qmf.getObjects(cls="exchange")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ queues = self.qmf.getObjects(_class="queue")
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
@@ -106,7 +106,7 @@ class BrokerManager:
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
+ exchanges = self.qmf.getObjects(_class="exchange")
print "Durable Type Bindings Exchange Name"
print "======================================================="
for ex in exchanges:
@@ -114,9 +114,9 @@ class BrokerManager:
print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
def ExchangeListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
@@ -130,8 +130,8 @@ class BrokerManager:
def QueueList (self, filter):
- queues = self.qmf.getObjects(cls="queue")
- journals = self.qmf.getObjects(cls="journal")
+ queues = self.qmf.getObjects(_class="queue")
+ journals = self.qmf.getObjects(_class="journal")
print " Store Size"
print "Durable AutoDel Excl Bindings (files x file pages) Queue Name"
print "==========================================================================================="
@@ -151,9 +151,9 @@ class BrokerManager:
YN (q.exclusive), q.bindingCount, q.name)
def QueueListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
diff --git a/python/commands/qpid-printevents b/python/commands/qpid-printevents
index 970607c797..6efd472221 100755
--- a/python/commands/qpid-printevents
+++ b/python/commands/qpid-printevents
@@ -30,16 +30,13 @@ class EventConsole(Console):
def event(self, broker, event):
print event
- def heartbeat(self, agent, timestamp):
- print "Heartbeat"
-
##
## Main Program
##
def main():
_usage = "%prog [options] [broker-addr]..."
_description = \
-"""Collect and print events from one of more Qpid message brokers. If no broker-addr is
+"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
supplied, %prog will connect to 'localhost:5672'.
broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index f9f938cdec..4dadcd543b 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -62,7 +62,7 @@ class RouteManager:
self.qmf.delBroker(self.broker)
def getLink (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if "%s:%d" % (link.host, link.port) == self.src.name ():
return link
@@ -74,7 +74,7 @@ class RouteManager:
print "Linking broker to itself is not permitted"
sys.exit(1)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link != None:
@@ -92,7 +92,7 @@ class RouteManager:
def DelLink (self, srcBroker):
self.src = qmfconsole.BrokerURL(srcBroker)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
@@ -103,7 +103,7 @@ class RouteManager:
print "Close method returned:", res.status, res.text
def ListLinks (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
else:
@@ -119,7 +119,7 @@ class RouteManager:
if self.dest.name() == self.src.name():
raise Exception("Linking broker to itself is not permitted")
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
@@ -140,7 +140,7 @@ class RouteManager:
if link == None:
raise Exception("Protocol Error - Missing link ID")
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey:
@@ -164,7 +164,7 @@ class RouteManager:
raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
sys.exit (0)
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
if _verbose:
@@ -186,8 +186,8 @@ class RouteManager:
raise Exception("Route not found")
def ListRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
myLink = None
@@ -199,8 +199,8 @@ class RouteManager:
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
def ClearAllRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if _verbose:
@@ -218,7 +218,7 @@ class RouteManager:
print "Ok"
if _dellink:
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
diff --git a/python/qpid/management.py b/python/qpid/management.py
index 8d8339b2c6..81d9dbe030 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -285,7 +285,7 @@ class managementClient:
ft = {}
ft["_class"] = className
codec.write_map (ft)
- msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
+ msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -398,7 +398,7 @@ class managementClient:
""" Compose the header of a management message. """
codec.write_uint8 (ord ('A'))
codec.write_uint8 (ord ('M'))
- codec.write_uint8 (ord ('1'))
+ codec.write_uint8 (ord ('2'))
codec.write_uint8 (opcode)
codec.write_uint32 (seq)
@@ -412,7 +412,7 @@ class managementClient:
if octet != 'M':
return None
octet = chr (codec.read_uint8 ())
- if octet != '1':
+ if octet != '2':
return None
opcode = chr (codec.read_uint8 ())
seq = codec.read_uint32 ()
@@ -433,7 +433,7 @@ class managementClient:
elif typecode == 6:
codec.write_str8 (value)
elif typecode == 7:
- codec.write_vbin32 (value)
+ codec.write_str16 (value)
elif typecode == 8: # ABSTIME
codec.write_uint64 (long (value))
elif typecode == 9: # DELTATIME
@@ -476,7 +476,7 @@ class managementClient:
elif typecode == 6:
data = str (codec.read_str8 ())
elif typecode == 7:
- data = codec.read_vbin32 ()
+ data = codec.read_str16 ()
elif typecode == 8: # ABSTIME
data = codec.read_uint64 ()
elif typecode == 9: # DELTATIME
@@ -604,6 +604,9 @@ class managementClient:
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
pname = str (codec.read_str8())
cname = str (codec.read_str8())
hash = codec.read_bin128()
@@ -656,13 +659,15 @@ class managementClient:
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
packageName = str (codec.read_str8 ())
className = str (codec.read_str8 ())
hash = codec.read_bin128 ()
configCount = codec.read_uint16 ()
instCount = codec.read_uint16 ()
methodCount = codec.read_uint16 ()
- eventCount = codec.read_uint16 ()
if packageName not in self.packages:
return
@@ -676,7 +681,6 @@ class managementClient:
configs = []
insts = []
methods = {}
- events = {}
configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
insts.append (("id", 4, None, None))
@@ -765,42 +769,14 @@ class managementClient:
args.append (arg)
methods[mname] = (mdesc, args)
- for idx in range (eventCount):
- ft = codec.read_map ()
- ename = str (ft["name"])
- argCount = ft["argCount"]
- if "desc" in ft:
- edesc = str (ft["desc"])
- else:
- edesc = None
-
- args = []
- for aidx in range (argCount):
- ft = codec.read_map ()
- name = str (ft["name"])
- type = ft["type"]
- unit = None
- desc = None
-
- for key, value in ft.items ():
- if key == "unit":
- unit = str (value)
- elif key == "desc":
- desc = str (value)
-
- arg = (name, type, unit, desc)
- args.append (arg)
- events[ename] = (edesc, args)
-
schemaClass = {}
schemaClass['C'] = configs
schemaClass['I'] = insts
schemaClass['M'] = methods
- schemaClass['E'] = events
self.schema[classKey] = schemaClass
if self.schemaCb != None:
- self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+ self.schemaCb (ch.context, classKey, configs, insts, methods, {})
def parsePresenceMasks(self, codec, schemaClass):
""" Generate a list of not-present properties """
@@ -896,7 +872,7 @@ class managementClient:
codec.write_str8 (classId[1])
codec.write_bin128 (classId[2])
codec.write_str8 (methodName)
- bank = objId.getBank()
+ bank = "%d.%d" % (objId.getBroker(), objId.getBank())
# Encode args according to schema
if classId not in self.schema:
@@ -926,5 +902,5 @@ class managementClient:
packageName = classId[0]
className = classId[1]
- msg = channel.message(codec.encoded, "agent." + str(bank))
+ msg = channel.message(codec.encoded, "agent." + bank)
channel.send ("qpid.management", msg)
diff --git a/python/qpid/managementdata.py b/python/qpid/managementdata.py
index d86dd3a360..2bf66a5e5b 100644
--- a/python/qpid/managementdata.py
+++ b/python/qpid/managementdata.py
@@ -546,10 +546,10 @@ class ManagementData:
for classKey in sorted:
tuple = self.schema[classKey]
row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]),
- len (tuple[2]), len (tuple[3]))
+ len (tuple[2]))
rows.append (row)
self.disp.table ("Classes in Schema:",
- ("Class", "Properties", "Statistics", "Methods", "Events"),
+ ("Class", "Properties", "Statistics", "Methods"),
rows)
finally:
self.lock.release ()
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index 3800e54b5b..c8035e87f2 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -49,7 +49,7 @@ class Console:
""" Invoked when a QMF package is discovered. """
pass
- def newClass(self, classKey):
+ def newClass(self, kind, classKey):
""" Invoked when a new class is discovered. Session.getSchema can be
used to obtain details about the class."""
pass
@@ -158,7 +158,7 @@ class Session:
raise Exception(broker.error)
self.brokers.append(broker)
- self.getObjects(broker=broker, cls="agent")
+ self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
@@ -219,35 +219,36 @@ class Session:
The class for queried objects may be specified in one of the following ways:
- schema = <schema> - supply a schema object returned from getSchema
- key = <key> - supply a classKey from the list returned by getClasses
- cls = <name> - supply a class name as a string
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
If objects should be obtained from only one agent, use the following argument.
Otherwise, the query will go to all agents.
- agent = <agent> - supply an agent from the list returned by getAgents
+ _agent = <agent> - supply an agent from the list returned by getAgents.
If the get query is to be restricted to one broker (as opposed to all connected brokers),
add the following argument:
- broker = <broker> - supply a broker as returned by addBroker
+ _broker = <broker> - supply a broker as returned by addBroker.
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
"""
- if "broker" in kwargs:
+ if "_broker" in kwargs:
brokerList = []
- brokerList.append(kwargs["broker"])
+ brokerList.append(kwargs["_broker"])
else:
brokerList = self.brokers
for broker in brokerList:
broker._waitForStable()
agentList = []
- if "agent" in kwargs:
- agent = kwargs["agent"]
+ if "_agent" in kwargs:
+ agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
agentList.append(agent)
@@ -257,11 +258,14 @@ class Session:
agentList.append(agent)
cname = None
- if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey()
- elif "key" in kwargs: pname, cname, hash = kwargs["key"]
- elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None
+ if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
+ elif "_key" in kwargs: pname, cname, hash = kwargs["_key"]
+ elif "_class" in kwargs:
+ pname, cname, hash = None, kwargs["_class"], None
+ if "_package" in kwargs:
+ pname = kwargs["_package"]
if cname == None:
- raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument")
+ raise Exception("No class supplied, use '_schema', '_key', or '_class' argument")
map = {}
map["_class"] = cname
if pname != None: map["_package"] = pname
@@ -269,7 +273,7 @@ class Session:
self.getSelect = []
for item in kwargs:
- if item != "schema" and item != "key" and item != "cls":
+ if item[0] != '_':
self.getSelect.append((item, kwargs[item]))
self.getResult = []
@@ -282,7 +286,7 @@ class Session:
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
broker._send(smsg)
starttime = time()
@@ -382,6 +386,7 @@ class Session:
self.cv.release()
def _handleClassInd(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
@@ -431,17 +436,18 @@ class Session:
self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
classKey = (pname, cname, hash)
- _class = SchemaClass(classKey, codec)
+ _class = SchemaClass(kind, classKey, codec)
self.cv.acquire()
self.packages[pname][(cname, hash)] = _class
self.cv.release()
broker._decOutstanding()
if self.console != None:
- self.console.newClass(classKey)
+ self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
pname = str(codec.read_str8())
@@ -485,7 +491,7 @@ class Session:
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
- for prop, propval in object.properties:
+ for prop, propval in object.getProperties():
if key == prop.name and value != propval:
return False
return True
@@ -497,7 +503,7 @@ class Session:
elif typecode == 3: data = codec.read_uint32() # U32
elif typecode == 4: data = codec.read_uint64() # U64
elif typecode == 6: data = str(codec.read_str8()) # SSTR
- elif typecode == 7: data = codec.read_vbin32() # LSTR
+ elif typecode == 7: data = codec.read_str16() # LSTR
elif typecode == 8: data = codec.read_int64() # ABSTIME
elif typecode == 9: data = codec.read_uint64() # DELTATIME
elif typecode == 10: data = ObjectId(codec) # REF
@@ -521,7 +527,7 @@ class Session:
elif typecode == 3: codec.write_uint32 (long(value)) # U32
elif typecode == 4: codec.write_uint64 (long(value)) # U64
elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_vbin32 (value) # LSTR
+ elif typecode == 7: codec.write_str16 (value) # LSTR
elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
elif typecode == 10: value.encode (codec) # REF
@@ -577,30 +583,42 @@ class ClassKey:
class SchemaClass:
""" """
- def __init__(self, key, codec):
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ def __init__(self, kind, key, codec):
+ self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
- self.methods = []
- self.events = []
-
- propCount = codec.read_uint16()
- statCount = codec.read_uint16()
- methodCount = codec.read_uint16()
- eventCount = codec.read_uint16()
-
- for idx in range(propCount):
- self.properties.append(SchemaProperty(codec))
- for idx in range(statCount):
- self.statistics.append(SchemaStatistic(codec))
- for idx in range(methodCount):
- self.methods.append(SchemaMethod(codec))
- for idx in range(eventCount):
- self.events.append(SchemaEvent(codec))
+ self.methods = []
+ self.arguments = []
+
+ if self.kind == self.CLASS_KIND_TABLE:
+ propCount = codec.read_uint16()
+ statCount = codec.read_uint16()
+ methodCount = codec.read_uint16()
+ for idx in range(propCount):
+ self.properties.append(SchemaProperty(codec))
+ for idx in range(statCount):
+ self.statistics.append(SchemaStatistic(codec))
+ for idx in range(methodCount):
+ self.methods.append(SchemaMethod(codec))
+
+ elif self.kind == self.CLASS_KIND_EVENT:
+ argCount = codec.read_uint16()
+ for idx in range(argCount):
+ self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
pname, cname, hash = self.classKey
- result = "Class: %s:%s " % (pname, cname)
+ if self.kind == self.CLASS_KIND_TABLE:
+ kindStr = "Table"
+ elif self.kind == self.CLASS_KIND_EVENT:
+ kindStr = "Event"
+ else:
+ kindStr = "Unsupported"
+ result = "%s Class: %s:%s " % (kindStr, pname, cname)
result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
return result
@@ -620,9 +638,9 @@ class SchemaClass:
""" Return the list of methods for the class. """
return self.methods
- def getEvents(self):
+ def getArguments(self):
""" Return the list of events for the class. """
- return self.events
+ return self.arguments
class SchemaProperty:
""" """
@@ -693,33 +711,6 @@ class SchemaMethod:
result += ")"
return result
-class SchemaEvent:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- argCount = map["argCount"]
- if "desc" in map:
- self.desc = str(map["desc"])
- else:
- self.desc = None
- self.arguments = []
-
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=False))
-
- def __repr__(self):
- result = self.name + "("
- first = True
- for arg in self.arguments:
- if first:
- first = False
- else:
- result += ", "
- result += arg.name
- result += ")"
- return result
-
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -743,7 +734,7 @@ class SchemaArgument:
elif key == "desc" : self.desc = str(value)
elif key == "default" : self.default = str(value)
-class ObjectId(object):
+class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, codec, first=0, second=0):
if codec:
@@ -800,80 +791,86 @@ class Object(object):
""" """
def __init__(self, session, broker, schema, codec, prop, stat):
""" """
- self.session = session
- self.broker = broker
- self.schema = schema
- self.currentTime = codec.read_uint64()
- self.createTime = codec.read_uint64()
- self.deleteTime = codec.read_uint64()
- self.objectId = ObjectId(codec)
- self.properties = []
- self.statistics = []
+ self._session = session
+ self._broker = broker
+ self._schema = schema
+ self._currentTime = codec.read_uint64()
+ self._createTime = codec.read_uint64()
+ self._deleteTime = codec.read_uint64()
+ self._objectId = ObjectId(codec)
+ self._properties = []
+ self._statistics = []
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
for property in schema.getProperties():
if property.name in notPresent:
- self.properties.append((property, None))
+ self._properties.append((property, None))
else:
- self.properties.append((property, self.session._decodeValue(codec, property.type)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
- self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
def getObjectId(self):
""" Return the object identifier for this object """
- return self.objectId
+ return self._objectId
def getClassKey(self):
""" Return the class-key that references the schema describing this object. """
- return self.schema.getKey()
+ return self._schema.getKey()
def getSchema(self):
""" Return the schema that describes this object. """
- return self.schema
+ return self._schema
def getMethods(self):
""" Return a list of methods available for this object. """
- return self.schema.getMethods()
+ return self._schema.getMethods()
def getTimestamps(self):
""" Return the current, creation, and deletion times for this object. """
- return self.currentTime, self.createTime, self.deleteTime
+ return self._currentTime, self._createTime, self._deleteTime
def getIndex(self):
""" Return a string describing this object's primary key. """
result = ""
- for property, value in self.properties:
+ for property, value in self._properties:
if property.index:
if result != "":
result += ":"
result += str(value)
return result
+ def getProperties(self):
+ return self._properties
+
+ def getStatistics(self):
+ return self._statistics
+
def __repr__(self):
return self.getIndex()
def __getattr__(self, name):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self.properties:
+ for property, value in self._properties:
if name == property.name:
return value
- for statistic, value in self.statistics:
+ for statistic, value in self._statistics:
if name == statistic.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
- sendCodec = Codec(self.broker.conn.spec)
- seq = self.session.seqMgr._reserve((self, method))
- self.broker._setHeader(sendCodec, 'M', seq)
- self.objectId.encode(sendCodec)
- pname, cname, hash = self.schema.getKey()
+ sendCodec = Codec(self._broker.conn.spec)
+ seq = self._session.seqMgr._reserve((self, method))
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ pname, cname, hash = self._schema.getKey()
sendCodec.write_str8(pname)
sendCodec.write_str8(cname)
sendCodec.write_bin128(hash)
@@ -888,29 +885,30 @@ class Object(object):
for arg in method.arguments:
if arg.dir.find("I") != -1:
- self.session._encodeValue(sendCodec, args[aIdx], arg.type)
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
- smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker.cv.acquire()
- self.broker.syncInFlight = True
- self.broker.cv.release()
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (self._objectId.getBroker(), self._objectId.getBank()))
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ self._broker.cv.release()
- self.broker._send(smsg)
+ self._broker._send(smsg)
- self.broker.cv.acquire()
+ self._broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight and self.broker.error == None:
- self.broker.cv.wait(self.broker.SYNC_TIME)
- if time() - starttime > self.broker.SYNC_TIME:
- self.broker.cv.release()
- self.session.seqMgr._release(seq)
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.release()
+ self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
- self.broker.cv.release()
- if self.broker.error != None:
- errorText = self.broker.error
- self.broker.error = None
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
raise Exception(errorText)
- return self.broker.syncResult
+ return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
@@ -954,7 +952,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.agents = {}
- self.agents[0] = Agent(self, 0, "BrokerAgent")
+ self.agents[0] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1040,14 +1038,15 @@ class Broker:
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- if obj.deleteTime == 0:
- if obj.objectIdBank not in self.agents:
- agent = Agent(self, obj.objectIdBank, obj.label)
- self.agents[obj.objectIdBank] = agent
+ bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ if obj._deleteTime == 0:
+ if bankKey not in self.agents:
+ agent = Agent(self, bankKey, obj.label)
+ self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(obj.objectIdBank, None)
+ agent = self.agents.pop(bankKey, None)
if agent != None and self.session.console != None:
self.session.console.delAgent(agent)
@@ -1055,7 +1054,7 @@ class Broker:
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
- codec.write_uint8(ord('1'))
+ codec.write_uint8(ord('2'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
@@ -1068,7 +1067,7 @@ class Broker:
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
- if octet != '1':
+ if octet != '2':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
@@ -1164,28 +1163,24 @@ class Agent:
self.label = label
def __repr__(self):
- return "Agent at bank %d (%s)" % (self.bank, self.label)
+ return "Agent at bank %s (%s)" % (self.bank, self.label)
class Event:
""" """
def __init__(self, session, codec):
self.session = session
- self.timestamp = codec.read_int64()
- self.objectId = ObjectId(codec)
pname = codec.read_str8()
cname = codec.read_str8()
hash = codec.read_bin128()
self.classKey = (pname, cname, hash)
- self.name = codec.read_str8()
+ self.timestamp = codec.read_int64()
+ self.schema = None
if pname in session.packages:
if (cname, hash) in session.packages[pname]:
- schema = session.packages[pname][(cname, hash)]
- for event in schema.getEvents():
- if event.name == self.name:
- self.schemaEvent = event
- self.arguments = {}
- for arg in event.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+ self.schema = session.packages[pname][(cname, hash)]
+ self.arguments = {}
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type)
def __repr__(self):
return self.getSyslogText()
@@ -1202,10 +1197,15 @@ class Event:
def getName(self):
return self.name
+ def getSchema(self):
+ return self.schema
+
def getSyslogText(self):
+ if self.schema == None:
+ return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
- for arg in self.schemaEvent.arguments:
+ out += " " + self.classKey[0] + ":" + self.classKey[1]
+ for arg in self.schema.arguments:
out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
return out
@@ -1247,8 +1247,8 @@ class DebugConsole(Console):
def newPackage(self, name):
print "newPackage:", name
- def newClass(self, classKey):
- print "newClass:", classKey
+ def newClass(self, kind, classKey):
+ print "newClass:", kind, classKey
def newAgent(self, agent):
print "newAgent:", agent
diff --git a/python/tests_0-10/management.py b/python/tests_0-10/management.py
index eea1b29404..efec2b8a92 100644
--- a/python/tests_0-10/management.py
+++ b/python/tests_0-10/management.py
@@ -59,7 +59,7 @@ class ManagementTest (TestBase010):
session = self.session
self.startQmf()
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
self.assertEqual (len(brokers), 1)
broker = brokers[0]
@@ -147,43 +147,43 @@ class ManagementTest (TestBase010):
session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="dest-queue", exchange="amq.direct")
- queues = self.qmf.getObjects(cls="queue")
+ queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
self.assertEqual (result.status, 0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,10)
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,20)
self.assertEqual (dq.msgDepth,0)
@@ -216,23 +216,23 @@ class ManagementTest (TestBase010):
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
result = pq.purge(1)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
result = pq.purge(9)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
result = pq.purge(0)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index a06ddca44b..dc42a4108e 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -86,17 +86,6 @@
<arg name="password" dir="I" type="sstr"/>
</method>
- <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
- <arg name="remoteAddress" type="sstr"/>
- <arg name="label" type="sstr"/>
- <arg name="brokerBank" type="uint32"/>
- <arg name="agentBank" type="uint32"/>
- </event>
-
- <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
- <arg name="remoteAddress" type="sstr"/>
- </event>
-
<method name="queueMoveMessages" desc="Move messages from one queue to another">
<arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
<arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
@@ -115,7 +104,8 @@
<property name="label" type="sstr" access="RO" desc="Label for agent"/>
<property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
<property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
- <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/>
+ <property name="brokerBank" type="uint32" access="RO" desc="Assigned object-id broker bank"/>
+ <property name="agentBank" type="uint32" access="RO" desc="Assigned object-id agent bank"/>
</class>
<!--
@@ -218,7 +208,7 @@
<property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
<property name="address" type="sstr" access="RC" index="y"/>
<property name="incoming" type="bool" access="RC"/>
- <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation ,...)"/>
+ <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/>
<statistic name="closing" type="bool" desc="This client is closing by management request"/>
<statistic name="federationLink" type="bool" desc="Is this a federation link"/>
@@ -305,5 +295,36 @@
<method name="resetLifespan"/>
<method name="close"/>
</class>
+
+ <eventArguments>
+ <arg name="altEx" type="sstr" desc="Name of the alternate exchange"/>
+ <arg name="args" type="map" desc="Supplemental arguments or parameters supplied"/>
+ <arg name="autoDel" type="bool" desc="Created object is automatically deleted when no longer in use"/>
+ <arg name="dest" type="sstr" desc="Destination tag for a subscription"/>
+ <arg name="disp" type="sstr" desc="Disposition of a declaration: 'created' if object was created, 'existing' if object already existed"/>
+ <arg name="durable" type="bool" desc="Created object is durable"/>
+ <arg name="exName" type="sstr" desc="Name of an exchange"/>
+ <arg name="exType" type="sstr" desc="Type of an exchange"/>
+ <arg name="excl" type="bool" desc="Created object is exclusive for the use of the owner only"/>
+ <arg name="key" type="sstr" desc="Key text used for routing or binding"/>
+ <arg name="qName" type="sstr" desc="Name of a queue"/>
+ <arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
+ <arg name="user" type="sstr" desc="Authentication identity"/>
+ </eventArguments>
+
+ <event name="clientConnect" args="rhost, user"/>
+ <event name="clientDisconnect" args="rhost, user"/>
+ <event name="agentConnect" args="rhost, user"/>
+ <event name="agentDisconnect" args="rhost, user"/>
+ <event name="brokerConnect" args="rhost, user"/>
+ <event name="brokerDisconnect" args="rhost, user"/>
+ <event name="queueDeclare" args="rhost, user, qName, durable, excl, autoDel, args, disp"/>
+ <event name="queueDelete" args="rhost, user, qName"/>
+ <event name="exchangeDeclare" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/>
+ <event name="exchangeDelete" args="rhost, user, exName"/>
+ <event name="bind" args="rhost, user, exName, qName, key, args"/>
+ <event name="unbind" args="rhost, user, exName, qName, key"/>
+ <event name="subscribe" args="rhost, user, qName, dest, excl, args"/>
+ <event name="unsubscribe" args="rhost, user, dest"/>
</schema>