diff options
| author | Ted Ross <tross@apache.org> | 2008-06-02 16:01:51 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-06-02 16:01:51 +0000 |
| commit | 40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94 (patch) | |
| tree | 80438fbfb8739e9189fdad70d8271ae2ca8d26f4 | |
| parent | e1c0b830b67e68be71e65ef18657e746ed6b971f (diff) | |
| download | qpid-python-40c8b6f844ce64fc4245e5f91e6b1eaea2fc9e94.tar.gz | |
QPID-1113 Management cleanup and performance enhancements
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662470 13f79535-47bb-0310-9956-ffa450edef68
28 files changed, 390 insertions, 335 deletions
diff --git a/cpp/managementgen/schema.py b/cpp/managementgen/schema.py index 7a9f23ea76..6e48003ab2 100755 --- a/cpp/managementgen/schema.py +++ b/cpp/managementgen/schema.py @@ -94,9 +94,23 @@ class SchemaType: if changeFlag != None: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); + elif self.accessor == "counterByOne": + stream.write (" inline void inc_" + varName + " (){\n"); + stream.write (" ++" + varName + ";\n") + if changeFlag != None: + stream.write (" " + changeFlag + " = true;\n") + stream.write (" }\n"); + stream.write (" inline void dec_" + varName + " (){\n"); + stream.write (" --" + varName + ";\n") + if changeFlag != None: + stream.write (" " + changeFlag + " = true;\n") + stream.write (" }\n"); elif self.accessor == "counter": stream.write (" inline void inc_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" " + varName + " += by;\n") + stream.write (" if (by == 1)\n") + stream.write (" ++" + varName + ";\n") + stream.write (" else\n") + stream.write (" " + varName + " += by;\n") if self.style == "wm": stream.write (" if (" + varName + "High < " + varName + ")\n") stream.write (" " + varName + "High = " + varName + ";\n") @@ -104,7 +118,10 @@ class SchemaType: stream.write (" " + changeFlag + " = true;\n") stream.write (" }\n"); stream.write (" inline void dec_" + varName + " (" + self.cpp + " by = 1){\n"); - stream.write (" " + varName + " -= by;\n") + stream.write (" if (by == 1)\n") + stream.write (" " + varName + "--;\n") + stream.write (" else\n") + stream.write (" " + varName + " -= by;\n") if self.style == "wm": stream.write (" if (" + varName + "Low > " + varName + ")\n") stream.write (" " + varName + "Low = " + varName + ";\n") @@ -196,16 +213,18 @@ class Type: #===================================================================================== class SchemaConfig: def __init__ (self, node, typespec): - self.name = None - self.type = None - self.access = "RO" - self.isIndex = 0 - self.isParentRef = 0 - self.unit = None - self.min = None - self.max = None - self.maxLen = None - self.desc = None + self.name = None + self.type = None + self.ref = None + self.access = "RO" + self.isIndex = 0 + self.isParentRef = 0 + self.isGeneralRef = 0 + self.unit = None + self.min = None + self.max = None + self.maxLen = None + self.desc = None attrs = node.attributes for idx in range (attrs.length): @@ -216,6 +235,9 @@ class SchemaConfig: elif key == 'type': self.type = Type (val, typespec) + + elif key == 'references': + self.ref = val elif key == 'access': self.access = val @@ -230,6 +252,11 @@ class SchemaConfig: raise ValueError ("Expected 'y' in parentRef attribute") self.isParentRef = 1 + elif key == 'isGeneralReference': + if val != 'y': + raise ValueError ("Expected 'y' in isGeneralReference attribute") + self.isGeneralRef = 1 + elif key == 'unit': self.unit = val @@ -246,12 +273,12 @@ class SchemaConfig: self.desc = val else: - raise ValueError ("Unknown attribute in configElement '%s'" % key) + raise ValueError ("Unknown attribute in property '%s'" % key) if self.name == None: - raise ValueError ("Missing 'name' attribute in configElement") + raise ValueError ("Missing 'name' attribute in property") if self.type == None: - raise ValueError ("Missing 'type' attribute in configElement") + raise ValueError ("Missing 'type' attribute in property") def getName (self): return self.name @@ -319,12 +346,12 @@ class SchemaInst: self.desc = val else: - raise ValueError ("Unknown attribute in instElement '%s'" % key) + raise ValueError ("Unknown attribute in statistic '%s'" % key) if self.name == None: - raise ValueError ("Missing 'name' attribute in instElement") + raise ValueError ("Missing 'name' attribute in statistic") if self.type == None: - raise ValueError ("Missing 'type' attribute in instElement") + raise ValueError ("Missing 'type' attribute in statistic") def getName (self): return self.name @@ -388,6 +415,8 @@ class SchemaInst: def genInitialize (self, stream): val = self.type.type.init + if self.type.type.accessor == "counterByOne": + return if self.type.type.style != "mma": stream.write (" " + self.name + " = " + val + ";\n") if self.type.type.style == "wm": @@ -589,13 +618,13 @@ class SchemaEvent: class SchemaClass: def __init__ (self, package, node, typespec, fragments, options): - self.packageName = package - self.configElements = [] - self.instElements = [] - self.methods = [] - self.events = [] - self.options = options - self.md5Sum = md5.new () + self.packageName = package + self.properties = [] + self.statistics = [] + self.methods = [] + self.events = [] + self.options = options + self.md5Sum = md5.new () self.hash (node) @@ -605,13 +634,13 @@ class SchemaClass: children = node.childNodes for child in children: if child.nodeType == Node.ELEMENT_NODE: - if child.nodeName == 'configElement': + if child.nodeName == 'property': sub = SchemaConfig (child, typespec) - self.configElements.append (sub) + self.properties.append (sub) - elif child.nodeName == 'instElement': + elif child.nodeName == 'statistic': sub = SchemaInst (child, typespec) - self.instElements.append (sub) + self.statistics.append (sub) elif child.nodeName == 'method': sub = SchemaMethod (self, child, typespec) @@ -644,10 +673,10 @@ class SchemaClass: name = attrs['name'].nodeValue for fragment in fragments: if fragment.name == name: - for config in fragment.configElements: - self.configElements.append (config) - for inst in fragment.instElements: - self.instElements.append (inst) + for config in fragment.properties: + self.properties.append (config) + for inst in fragment.statistics: + self.statistics.append (inst) for method in fragment.methods: self.methods.append (method) for event in fragment.events: @@ -675,33 +704,33 @@ class SchemaClass: # match the substitution keywords in the template files. #=================================================================================== def genAccessorMethods (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.access != "RC": config.genAccessor (stream) - for inst in self.instElements: + for inst in self.statistics: inst.genAccessor (stream) def genConfigCount (self, stream, variables): - stream.write ("%d" % len (self.configElements)) + stream.write ("%d" % len (self.properties)) def genConfigDeclarations (self, stream, variables): - for element in self.configElements: + for element in self.properties: element.genDeclaration (stream) def genConfigElementSchema (self, stream, variables): - for config in self.configElements: + for config in self.properties: config.genSchema (stream) def genConstructorArgs (self, stream, variables): # Constructor args are config elements with read-create access result = "" - for element in self.configElements: + for element in self.properties: if element.isConstructorArg (): stream.write (", ") element.genFormalParam (stream) def genConstructorInits (self, stream, variables): - for element in self.configElements: + for element in self.properties: if element.isConstructorArg (): stream.write ("," + element.getName () + "(_" + element.getName () + ")") @@ -729,21 +758,21 @@ class SchemaClass: pass ########################################################################### def genHiLoStatResets (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genHiLoStatResets (stream) def genInitializeElements (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genInitialize (stream) def genInstChangedStub (self, stream, variables): - if len (self.instElements) == 0: + if len (self.statistics) == 0: stream.write (" // Stub for getInstChanged. There are no inst elements\n") stream.write (" bool getInstChanged (void) { return false; }\n") def genInstCount (self, stream, variables): count = 0 - for inst in self.instElements: + for inst in self.statistics: count = count + 1 if inst.type.type.style == "wm": count = count + 2 @@ -752,11 +781,11 @@ class SchemaClass: stream.write ("%d" % count) def genInstDeclarations (self, stream, variables): - for element in self.instElements: + for element in self.statistics: element.genDeclaration (stream) def genInstElementSchema (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genSchema (stream) def genMethodArgIncludes (self, stream, variables): @@ -794,6 +823,10 @@ class SchemaClass: arg.name, "outBuf") + ";\n") stream.write (" return;\n }\n") + def genSetGeneralReferenceDeclaration (self, stream, variables): + for prop in self.properties: + if prop.isGeneralRef: + stream.write ("void setReference(uint64_t objectId) { " + prop.name + " = objectId; }\n") def genMethodIdDeclarations (self, stream, variables): number = 1 @@ -822,13 +855,13 @@ class SchemaClass: stream.write (self.name.upper ()) def genParentArg (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.isParentRef == 1: stream.write (", Manageable* _parent") return def genParentRefAssignment (self, stream, variables): - for config in self.configElements: + for config in self.properties: if config.isParentRef == 1: stream.write (config.getName () + \ " = _parent->GetManagementObject ()->getObjectId ();") @@ -842,11 +875,11 @@ class SchemaClass: stream.write (hex (ord (sum[idx]))) def genWriteConfig (self, stream, variables): - for config in self.configElements: + for config in self.properties: config.genWrite (stream); def genWriteInst (self, stream, variables): - for inst in self.instElements: + for inst in self.statistics: inst.genWrite (stream); diff --git a/cpp/managementgen/templates/Class.cpp b/cpp/managementgen/templates/Class.cpp index 733e29188e..699d8217b6 100644 --- a/cpp/managementgen/templates/Class.cpp +++ b/cpp/managementgen/templates/Class.cpp @@ -75,9 +75,9 @@ void /*MGEN:Class.NameCap*/::writeSchema (Buffer& buf) buf.putShort (/*MGEN:Class.MethodCount*/); // Method Count buf.putShort (/*MGEN:Class.EventCount*/); // Event Count - // Config Elements + // Properties /*MGEN:Class.ConfigElementSchema*/ - // Inst Elements + // Statistics /*MGEN:Class.InstElementSchema*/ // Methods /*MGEN:Class.MethodSchema*/ diff --git a/cpp/managementgen/templates/Class.h b/cpp/managementgen/templates/Class.h index 628a70d2d9..441571a174 100644 --- a/cpp/managementgen/templates/Class.h +++ b/cpp/managementgen/templates/Class.h @@ -26,6 +26,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" +#include "qpid/sys/AtomicCount.h" namespace qpid { namespace management { @@ -38,9 +39,9 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject static std::string className; static uint8_t md5Sum[16]; - // Configuration Elements + // Properties /*MGEN:Class.ConfigDeclarations*/ - // Instrumentation Elements + // Statistics /*MGEN:Class.InstDeclarations*/ // Private Methods static void writeSchema (qpid::framing::Buffer& buf); @@ -61,6 +62,8 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject /*MGEN:Class.NameCap*/ (Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/); ~/*MGEN:Class.NameCap*/ (void); + /*MGEN:Class.SetGeneralReferenceDeclaration*/ + std::string getPackageName (void) { return packageName; } std::string getClassName (void) { return className; } uint8_t* getMd5Sum (void) { return md5Sum; } @@ -72,6 +75,5 @@ class /*MGEN:Class.NameCap*/ : public ManagementObject }; }} - #endif /*!_MANAGEMENT_/*MGEN:Class.NameUpper*/_*/ diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index b0e006aebc..53b828a709 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -528,6 +528,7 @@ nobase_include_HEADERS = \ qpid/sys/AggregateOutput.h \ qpid/sys/AsynchIO.h \ qpid/sys/AsynchIOHandler.h \ + qpid/sys/AtomicCount.h \ qpid/sys/AtomicValue.h \ qpid/sys/AtomicValue_gcc.h \ qpid/sys/AtomicValue_mutex.h \ diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index f3e103dfaf..18b2c52dad 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -39,7 +39,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, const management::ArgsLinkBridge& _args) : link(_link), id(_id), args(_args), mgmtObject(new management::Bridge(this, link, id, args.i_durable, args.i_src, args.i_dest, - args.i_key, args.i_src_is_queue, args.i_src_is_local, + args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes)), listener(l), name(Uuid(true).str()), persistenceId(0) { @@ -61,10 +61,10 @@ void Bridge::create(ConnectionState& c) session->attach(name, false); session->commandPoint(0,0); - if (args.i_src_is_local) { + if (args.i_srcIsLocal) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { - if (args.i_src_is_queue) { + if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); @@ -79,7 +79,7 @@ void Bridge::create(ConnectionState& c) queueSettings.setString("qpid.trace.exclude", args.i_excludes); } - bool durable = false;//should this be an arg, or would be use src_is_queue for durable queues? + bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? peer->getQueue().declare(queue, "", false, durable, true, autoDelete, queueSettings); peer->getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); @@ -150,8 +150,8 @@ void Bridge::encode(Buffer& buffer) const buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); buffer.putShortString(args.i_key); - buffer.putOctet(args.i_src_is_queue ? 1 : 0); - buffer.putOctet(args.i_src_is_local ? 1 : 0); + buffer.putOctet(args.i_srcIsQueue ? 1 : 0); + buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); } @@ -165,8 +165,8 @@ uint32_t Bridge::encodedSize() const + args.i_src.size() + 1 + args.i_dest.size() + 1 + args.i_key.size() + 1 - + 1 // src_is_queue - + 1 // src_is_local + + 1 // srcIsQueue + + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1; } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ea3d3547f5..61319f3c09 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -55,34 +55,34 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std mgmtId(mgmtId_), links(broker_.getLinks()) { - Manageable* parent = broker.GetVhostObject (); + Manageable* parent = broker.GetVhostObject(); if (isLink) - links.notifyConnection (mgmtId, this); + links.notifyConnection(mgmtId, this); if (parent != 0) { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent(); - if (agent.get () != 0) - mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); - agent->addObject (mgmtObject); + if (agent.get() != 0) + mgmtObject = management::Connection::shared_ptr(new management::Connection(this, parent, mgmtId, !isLink)); + agent->addObject(mgmtObject); } } -void Connection::requestIOProcessing (boost::function0<void> callback) +void Connection::requestIOProcessing(boost::function0<void> callback) { ioCallback = callback; out->activateOutput(); } -Connection::~Connection () +Connection::~Connection() { if (mgmtObject.get() != 0) mgmtObject->resourceDestroy(); if (isLink) - links.notifyClosed (mgmtId); + links.notifyClosed(mgmtId); } void Connection::received(framing::AMQFrame& frame){ @@ -98,21 +98,21 @@ void Connection::received(framing::AMQFrame& frame){ recordFromClient(frame); } -void Connection::recordFromServer (framing::AMQFrame& frame) +void Connection::recordFromServer(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesToClient (); - mgmtObject->inc_bytesToClient (frame.size ()); + mgmtObject->inc_framesToClient(); + mgmtObject->inc_bytesToClient(frame.size()); } } -void Connection::recordFromClient (framing::AMQFrame& frame) +void Connection::recordFromClient(framing::AMQFrame& frame) { - if (mgmtObject.get () != 0) + if (mgmtObject.get() != 0) { - mgmtObject->inc_framesFromClient (); - mgmtObject->inc_bytesFromClient (frame.size ()); + mgmtObject->inc_framesFromClient(); + mgmtObject->inc_bytesFromClient(frame.size()); } } @@ -129,6 +129,14 @@ string Connection::getAuthCredentials() if (!isLink) return string(); + if (mgmtObject.get() != 0) + { + if (links.getAuthMechanism(mgmtId) == "ANONYMOUS") + mgmtObject->set_authIdentity("anonymous"); + else + mgmtObject->set_authIdentity(links.getAuthIdentity(mgmtId)); + } + return links.getAuthCredentials(mgmtId); } @@ -138,6 +146,12 @@ void Connection::notifyConnectionForced(const string& text) links.notifyConnectionForced(mgmtId, text); } +void Connection::setUserId(const string& userId) +{ + ConnectionState::setUserId(userId); + mgmtObject->set_authIdentity(userId); +} + void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { @@ -177,7 +191,7 @@ bool Connection::doOutput() ioCallback = 0; if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); + close(403, "Closed by Management Request", 0, 0); else //then do other output as needed: return outputTasks.doOutput(); @@ -202,20 +216,20 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObject (void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { return dynamic_pointer_cast<ManagementObject>(mgmtObject); } -Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) +Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Connection::ManagementMethod [id=" << methodId << "]"); + QPID_LOG(debug, "Connection::ManagementMethod [id=" << methodId << "]"); switch (methodId) { - case management::Client::METHOD_CLOSE : + case management::Connection::METHOD_CLOSE : mgmtClosing = true; if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index e6e3d4d15e..9e713140dd 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -42,7 +42,7 @@ #include "ConnectionState.h" #include "SessionHandler.h" #include "qpid/management/Manageable.h" -#include "qpid/management/Client.h" +#include "qpid/management/Connection.h" #include <boost/ptr_container/ptr_map.hpp> @@ -88,6 +88,7 @@ class Connection : public sys::ConnectionInputHandler, std::string getAuthMechanism(); std::string getAuthCredentials(); void notifyConnectionForced(const std::string& text); + void setUserId(const string& uid); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; @@ -100,7 +101,7 @@ class Connection : public sys::ConnectionInputHandler, bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; - management::Client::shared_ptr mgmtObject; + management::Connection::shared_ptr mgmtObject; LinkRegistry& links; }; diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 691d47d866..698f8123e8 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -56,7 +56,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setStagingThreshold(uint64_t st) { stagingThreshold = st; } - void setUserId(const string& uid) { userId = uid; } + virtual void setUserId(const string& uid) { userId = uid; } const string& getUserId() const { return userId; } Broker& getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 72021b8d98..84a5362766 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -54,8 +54,8 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingKey].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings(); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else{ @@ -78,8 +78,8 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c bindings.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index df723d2c8f..3483562292 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -53,8 +53,8 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, Binding::shared_ptr binding (new Binding ("", queue, this)); bindings.push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -73,8 +73,8 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 5196099ed5..20d9617c8f 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -90,8 +90,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co bindings.push_back(headerMap); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } else { @@ -115,8 +115,8 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, if (i != bindings.end()) { bindings.erase(i); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6bcfcf77a3..08b9d8fe3e 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -341,8 +341,8 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args std::pair<Bridge::shared_ptr, bool> result = links->declare (host, port, iargs.i_durable, iargs.i_src, - iargs.i_dest, iargs.i_key, iargs.i_src_is_queue, - iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes); + iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue, + iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes); if (result.second && iargs.i_durable) store->create(*result.first); diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 455cc8452e..0703c276cf 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -87,8 +87,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& tag, std::string& excludes) { @@ -110,14 +110,14 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, management::ArgsLinkBridge args; Bridge::shared_ptr bridge; - args.i_durable = durable; - args.i_src = src; - args.i_dest = dest; - args.i_key = key; - args.i_src_is_queue = is_queue; - args.i_src_is_local = is_local; - args.i_tag = tag; - args.i_excludes = excludes; + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_srcIsQueue = isQueue; + args.i_srcIsLocal = isLocal; + args.i_tag = tag; + args.i_excludes = excludes; bridge = Bridge::shared_ptr (new Bridge (l->second.get(), l->second->nextChannel(), @@ -237,4 +237,14 @@ std::string LinkRegistry::getAuthCredentials(const std::string& key) return result; } +std::string LinkRegistry::getAuthIdentity(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l == links.end()) + return string(); + + return l->second->getUsername(); +} + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index f902490ed3..242c0d58ba 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -81,8 +81,8 @@ namespace broker { std::string& src, std::string& dest, std::string& key, - bool is_queue, - bool is_local, + bool isQueue, + bool isLocal, std::string& id, std::string& excludes); @@ -113,6 +113,7 @@ namespace broker { void notifyConnectionForced (const std::string& key, const std::string& text); std::string getAuthMechanism (const std::string& key); std::string getAuthCredentials (const std::string& key); + std::string getAuthIdentity (const std::string& key); }; } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 2c9717caa0..0b26762697 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -130,19 +130,15 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } }else { if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } @@ -157,13 +153,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); } if (store && !msg->isContentLoaded()) { @@ -176,13 +170,11 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); if (mgmtObject.get() != 0) { - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); mgmtObject->inc_msgDepth (); - mgmtObject->inc_byteDepth (msg->contentSize ()); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); @@ -367,8 +359,7 @@ void Queue::consume(Consumer& c, bool requestExclusive){ consumerCount++; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->inc_consumers (); + mgmtObject->inc_consumerCount (); } } @@ -378,8 +369,7 @@ void Queue::cancel(Consumer& c){ consumerCount--; if(exclusive) exclusive = 0; if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); - mgmtObject->dec_consumers (); + mgmtObject->dec_consumerCount (); } } @@ -409,11 +399,9 @@ void Queue::pop(){ if (policy.get()) policy->dequeued(msg.payload->contentSize()); if (mgmtObject.get() != 0){ - sys::Mutex::ScopedLock mutex(mgmtObject->getLock()); mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize()); mgmtObject->dec_msgDepth (); - mgmtObject->dec_byteDepth (msg.payload->contentSize()); if (msg.payload->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize()); @@ -682,7 +670,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { if (inst) { ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj.get() != 0) - mgmtObject->set_storeRef(childObj->getObjectId()); + childObj->setReference(mgmtObject->getObjectId()); } } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index dd8267a7d8..1ddb3f3026 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -111,7 +111,7 @@ void SessionState::attach(SessionHandler& h) { if (mgmtObject.get() != 0) { mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 1c4fa2ea7a..a16421b090 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -138,8 +138,8 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons Binding::shared_ptr binding (new Binding (routingKey, queue, this)); bindings[routingPattern].push_back(binding); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindings(); + mgmtExchange->inc_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->inc_bindingCount(); } return true; } @@ -159,8 +159,8 @@ bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, co qv.erase(q); if(qv.empty()) bindings.erase(bi); if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); - dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindings(); + mgmtExchange->dec_bindingCount(); + dynamic_pointer_cast<management::Queue>(queue->GetManagementObject())->dec_bindingCount(); } return true; } diff --git a/cpp/src/qpid/broker/XmlExchange.cpp b/cpp/src/qpid/broker/XmlExchange.cpp index 1d8f2ae8d8..8c4d4f79a4 100644 --- a/cpp/src/qpid/broker/XmlExchange.cpp +++ b/cpp/src/qpid/broker/XmlExchange.cpp @@ -97,7 +97,7 @@ bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const QPID_LOG(trace, "Bound successfully with query: " << queryText ); if (mgmtExchange.get() != 0) { - mgmtExchange->inc_bindings (); + mgmtExchange->inc_bindingCount(); } return true; } else{ @@ -128,7 +128,7 @@ bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, cons bindingsMap.erase(routingKey); } if (mgmtExchange.get() != 0) { - mgmtExchange->dec_bindings (); + mgmtExchange->dec_bindingCount(); } return true; } else { diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 0ddbd62350..a2802cf932 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -630,7 +630,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_sessionId (sessionId); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_sysId (systemId); + agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); remoteAgents[sessionId] = agent; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 6af5412b99..68d7e5c886 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -37,3 +37,6 @@ void ManagementObject::writeTimestamps (Buffer& buf) buf.putLongLong (destroyTime); buf.putLongLong (objectId); } + +void ManagementObject::setReference(uint64_t) {} + diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 2661cf2d96..1d54d606a4 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -92,6 +92,7 @@ class ManagementObject virtual void doMethod (std::string methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; + virtual void setReference(uint64_t objectId); virtual std::string getClassName (void) = 0; virtual std::string getPackageName (void) = 0; diff --git a/cpp/src/qpid/sys/AtomicCount.h b/cpp/src/qpid/sys/AtomicCount.h new file mode 100644 index 0000000000..d598b49427 --- /dev/null +++ b/cpp/src/qpid/sys/AtomicCount.h @@ -0,0 +1,52 @@ +#ifndef _posix_AtomicCount_h +#define _posix_AtomicCount_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/detail/atomic_count.hpp> +#include "ScopedIncrement.h" + +namespace qpid { +namespace sys { + +/** + * Atomic counter. + */ +class AtomicCount { + public: + typedef ::qpid::sys::ScopedDecrement<AtomicCount> ScopedDecrement; + typedef ::qpid::sys::ScopedIncrement<AtomicCount> ScopedIncrement; + + AtomicCount(long value = 0) : count(value) {} + + void operator++() { ++count ; } + + long operator--() { return --count; } + + operator long() const { return count; } + + private: + boost::detail::atomic_count count; +}; + + +}} + + +#endif // _posix_AtomicCount_h diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 0f52165587..b92df89839 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -172,7 +172,7 @@ class FederationTests(TestBase010): link = mgmt.get_object("link") mgmt.call_method(link, "bridge", {"durable":0, "src":"my-bridge-queue", "dest":"amq.fanout", - "key":"", "tag":"", "excludes":"", "src_is_queue":1}) + "key":"", "tag":"", "excludes":"", "srcIsQueue":1}) sleep(6) bridge = mgmt.get_object("bridge") diff --git a/python/commands/qpid-config b/python/commands/qpid-config index 58a2272ed2..cc9315f7ea 100755 --- a/python/commands/qpid-config +++ b/python/commands/qpid-config @@ -145,7 +145,7 @@ class BrokerManager: print "=======================================================" for ex in exchanges: if self.match (ex.name, filter): - print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindings, ex.name) + print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name) def ExchangeListRecurse (self, filter): self.ConnectToBroker () @@ -185,12 +185,12 @@ class BrokerManager: fc = int (args[FILECOUNT]) print "%4c%9c%7c%10d%11dx%-14d%s" % \ (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindings, fc, fs, q.name) + YN (q.exclusive), q.bindingCount, fc, fs, q.name) else: if not _durable: print "%4c%9c%7c%10d %s" % \ (YN (q.durable), YN (q.autoDelete), - YN (q.exclusive), q.bindings, q.name) + YN (q.exclusive), q.bindingCount, q.name) def QueueListRecurse (self, filter): self.ConnectToBroker () diff --git a/python/commands/qpid-queue-stats b/python/commands/qpid-queue-stats index 437714fec4..98dfa7580a 100755 --- a/python/commands/qpid-queue-stats +++ b/python/commands/qpid-queue-stats @@ -126,14 +126,14 @@ class BrokerManager: deltaTime = float (obj.timestamps[0] - lastSample.timestamps[0]) enqueueRate = float (obj.msgTotalEnqueues - lastSample.msgTotalEnqueues) / (deltaTime / 1000000000.0) dequeueRate = float (obj.msgTotalDequeues - lastSample.msgTotalDequeues) / (deltaTime / 1000000000.0) - print "%-41s%10.2f%10d..%-10d%13.2f%13.2f" % \ - (name, deltaTime / 1000000000, obj.msgDepthLow, obj.msgDepthHigh, enqueueRate, dequeueRate) + print "%-41s%10.2f%11d%13.2f%13.2f" % \ + (name, deltaTime / 1000000000, obj.msgDepth, enqueueRate, dequeueRate) def Display (self): self.ConnectToBroker () - print "Queue Name Sec Depth Range Enq Rate Deq Rate" - print "===================================================================================================" + print "Queue Name Sec Depth Enq Rate Deq Rate" + print "========================================================================================" try: while True: sleep (1) diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 4e8dbc3a77..2d5249ab78 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -201,14 +201,14 @@ class RouteManager: if _verbose: print "Creating inter-broker binding..." bridgeArgs = {} - bridgeArgs["durable"] = _durable - bridgeArgs["src"] = exchange - bridgeArgs["dest"] = exchange - bridgeArgs["key"] = routingKey - bridgeArgs["tag"] = id - bridgeArgs["excludes"] = excludes - bridgeArgs["src_is_queue"] = 0 - bridgeArgs["src_is_local"] = 0 + bridgeArgs["durable"] = _durable + bridgeArgs["src"] = exchange + bridgeArgs["dest"] = exchange + bridgeArgs["key"] = routingKey + bridgeArgs["tag"] = id + bridgeArgs["excludes"] = excludes + bridgeArgs["srcIsQueue"] = 0 + bridgeArgs["srcIsLocal"] = 0 res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs) if res.status == 4: print "Can't create a durable route on a non-durable link" diff --git a/specs/management-schema.xml b/specs/management-schema.xml index c3fbee615f..b1472bf284 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -33,7 +33,7 @@ RC => Read/Create, can be set at create time only, read-only thereafter RW => Read/Write - If access rights are omitted for a configElement, they are assumed to be RO. + If access rights are omitted for a property, they are assumed to be RO. --> @@ -44,14 +44,14 @@ System =============================================================== --> - <class name="system"> - <configElement name="sysId" index="y" type="uuid" access="RC"/> + <class name="System"> + <property name="systemId" index="y" type="uuid" access="RC"/> - <configElement name="osName" type="sstr" access="RO" desc="Operating System Name"/> - <configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/> - <configElement name="release" type="sstr" access="RO"/> - <configElement name="version" type="sstr" access="RO"/> - <configElement name="machine" type="sstr" access="RO"/> + <property name="osName" type="sstr" access="RO" desc="Operating System Name"/> + <property name="nodeName" type="sstr" access="RO" desc="Node Name"/> + <property name="release" type="sstr" access="RO"/> + <property name="version" type="sstr" access="RO"/> + <property name="machine" type="sstr" access="RO"/> </class> @@ -60,19 +60,19 @@ Broker =============================================================== --> - <class name="broker"> - <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID" parentRef="y"/> - <configElement name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> - <configElement name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> - <configElement name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> - <configElement name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> - <configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> - <configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> - <configElement name="clusterName" type="sstr" access="RO" + <class name="Broker"> + <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/> + <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/> + <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/> + <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/> + <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/> + <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/> + <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/> + <property name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of"/> - <configElement name="version" type="sstr" access="RO" desc="Running software version"/> - <configElement name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/> - <configElement name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/> + <property name="version" type="sstr" access="RO" desc="Running software version"/> + <property name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/> + <property name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/> <method name="joinCluster"> <arg name="clusterName" dir="I" type="sstr"/> @@ -101,11 +101,11 @@ Management Agent =============================================================== --> - <class name="agent"> - <configElement name="sessionId" type="uuid" access="RO" index="y" desc="Session ID for Agent"/> - <configElement name="label" type="sstr" access="RO" desc="Label for agent"/> - <configElement name="registeredTo" type="objId" access="RO" desc="Broker agent is registered to"/> - <configElement name="sysId" type="uuid" access="RO" desc="Identifier of system where agent resides"/> + <class name="Agent"> + <property name="sessionId" type="uuid" access="RO" index="y" desc="Session ID for Agent"/> + <property name="label" type="sstr" access="RO" desc="Label for agent"/> + <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/> + <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/> </class> <!-- @@ -113,9 +113,9 @@ Virtual Host =============================================================== --> - <class name="vhost"> - <configElement name="brokerRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="name" type="sstr" access="RC" index="y"/> + <class name="Vhost"> + <property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> </class> <!-- @@ -123,42 +123,40 @@ Queue =============================================================== --> - <class name="queue"> - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="name" type="sstr" access="RC" index="y"/> - - <configElement name="durable" type="bool" access="RC"/> - <configElement name="autoDelete" type="bool" access="RC"/> - <configElement name="exclusive" type="bool" access="RC"/> - <configElement name="arguments" type="ftable" access="RO" desc="Arguments supplied in queue.declare"/> - <configElement name="storeRef" type="objId" access="RO" desc="Reference to persistent queue (if durable)"/> - - <instElement name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/> - <instElement name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/> - <instElement name="msgTxnEnqueues" type="count64" unit="message" desc="Transactional messages enqueued"/> - <instElement name="msgTxnDequeues" type="count64" unit="message" desc="Transactional messages dequeued"/> - <instElement name="msgPersistEnqueues" type="count64" unit="message" desc="Persistent messages enqueued"/> - <instElement name="msgPersistDequeues" type="count64" unit="message" desc="Persistent messages dequeued"/> - <instElement name="msgDepth" type="hilo32" unit="message" desc="Current size of queue in messages"/> - <instElement name="byteTotalEnqueues" type="count64" unit="octet" desc="Total messages enqueued"/> - <instElement name="byteTotalDequeues" type="count64" unit="octet" desc="Total messages dequeued"/> - <instElement name="byteTxnEnqueues" type="count64" unit="octet" desc="Transactional messages enqueued"/> - <instElement name="byteTxnDequeues" type="count64" unit="octet" desc="Transactional messages dequeued"/> - <instElement name="bytePersistEnqueues" type="count64" unit="octet" desc="Persistent messages enqueued"/> - <instElement name="bytePersistDequeues" type="count64" unit="octet" desc="Persistent messages dequeued"/> - <instElement name="byteDepth" type="hilo32" unit="octet" desc="Current size of queue in bytes"/> - <instElement name="enqueueTxnStarts" type="count64" unit="transaction" desc="Total enqueue transactions started "/> - <instElement name="enqueueTxnCommits" type="count64" unit="transaction" desc="Total enqueue transactions committed"/> - <instElement name="enqueueTxnRejects" type="count64" unit="transaction" desc="Total enqueue transactions rejected"/> - <instElement name="enqueueTxnCount" type="hilo32" unit="transaction" desc="Current pending enqueue transactions"/> - <instElement name="dequeueTxnStarts" type="count64" unit="transaction" desc="Total dequeue transactions started"/> - <instElement name="dequeueTxnCommits" type="count64" unit="transaction" desc="Total dequeue transactions committed"/> - <instElement name="dequeueTxnRejects" type="count64" unit="transaction" desc="Total dequeue transactions rejected"/> - <instElement name="dequeueTxnCount" type="hilo32" unit="transaction" desc="Current pending dequeue transactions"/> - <instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/> - <instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/> - <instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> - <instElement name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> + <class name="Queue"> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + + <property name="durable" type="bool" access="RC"/> + <property name="autoDelete" type="bool" access="RC"/> + <property name="exclusive" type="bool" access="RC"/> + <property name="arguments" type="ftable" access="RO" desc="Arguments supplied in queue.declare"/> + + <statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued"/> + <statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued"/> + <statistic name="msgTxnEnqueues" type="count64" unit="message" desc="Transactional messages enqueued"/> + <statistic name="msgTxnDequeues" type="count64" unit="message" desc="Transactional messages dequeued"/> + <statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Persistent messages enqueued"/> + <statistic name="msgPersistDequeues" type="count64" unit="message" desc="Persistent messages dequeued"/> + <statistic name="msgDepth" type="atomic32" unit="message" desc="Current size of queue in messages"/> + <statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total messages enqueued"/> + <statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total messages dequeued"/> + <statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Transactional messages enqueued"/> + <statistic name="byteTxnDequeues" type="count64" unit="octet" desc="Transactional messages dequeued"/> + <statistic name="bytePersistEnqueues" type="count64" unit="octet" desc="Persistent messages enqueued"/> + <statistic name="bytePersistDequeues" type="count64" unit="octet" desc="Persistent messages dequeued"/> + <statistic name="enqueueTxnStarts" type="count64" unit="transaction" desc="Total enqueue transactions started "/> + <statistic name="enqueueTxnCommits" type="count64" unit="transaction" desc="Total enqueue transactions committed"/> + <statistic name="enqueueTxnRejects" type="count64" unit="transaction" desc="Total enqueue transactions rejected"/> + <statistic name="enqueueTxnCount" type="hilo32" unit="transaction" desc="Current pending enqueue transactions"/> + <statistic name="dequeueTxnStarts" type="count64" unit="transaction" desc="Total dequeue transactions started"/> + <statistic name="dequeueTxnCommits" type="count64" unit="transaction" desc="Total dequeue transactions committed"/> + <statistic name="dequeueTxnRejects" type="count64" unit="transaction" desc="Total dequeue transactions rejected"/> + <statistic name="dequeueTxnCount" type="hilo32" unit="transaction" desc="Current pending dequeue transactions"/> + <statistic name="consumerCount" type="hilo32" unit="consumer" desc="Current consumers on queue"/> + <statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/> + <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> + <statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> <method name="purge" desc="Discard all messages on queue"/> </class> @@ -168,20 +166,20 @@ Exchange =============================================================== --> - <class name="exchange"> - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="name" type="sstr" access="RC" index="y"/> - <configElement name="type" type="sstr" access="RO"/> - <configElement name="durable" type="bool" access="RC"/> - - <instElement name="producers" type="hilo32" desc="Current producers on exchange"/> - <instElement name="bindings" type="hilo32" desc="Current bindings"/> - <instElement name="msgReceives" type="count64" desc="Total messages received"/> - <instElement name="msgDrops" type="count64" desc="Total messages dropped (no matching key)"/> - <instElement name="msgRoutes" type="count64" desc="Total routed messages"/> - <instElement name="byteReceives" type="count64" desc="Total bytes received"/> - <instElement name="byteDrops" type="count64" desc="Total bytes dropped (no matching key)"/> - <instElement name="byteRoutes" type="count64" desc="Total routed bytes"/> + <class name="Exchange"> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="type" type="sstr" access="RO"/> + <property name="durable" type="bool" access="RC"/> + + <statistic name="producerCount" type="hilo32" desc="Current producers on exchange"/> + <statistic name="bindingCount" type="hilo32" desc="Current bindings"/> + <statistic name="msgReceives" type="count64" desc="Total messages received"/> + <statistic name="msgDrops" type="count64" desc="Total messages dropped (no matching key)"/> + <statistic name="msgRoutes" type="count64" desc="Total routed messages"/> + <statistic name="byteReceives" type="count64" desc="Total bytes received"/> + <statistic name="byteDrops" type="count64" desc="Total bytes dropped (no matching key)"/> + <statistic name="byteRoutes" type="count64" desc="Total routed bytes"/> </class> <!-- @@ -189,13 +187,13 @@ Binding =============================================================== --> - <class name="binding"> - <configElement name="exchangeRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="queueRef" type="objId" access="RC" index="y"/> - <configElement name="bindingKey" type="sstr" access="RC" index="y"/> - <configElement name="arguments" type="ftable" access="RC"/> + <class name="Binding"> + <property name="exchangeRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/> + <property name="queueRef" type="objId" references="Queue" access="RC" index="y"/> + <property name="bindingKey" type="sstr" access="RC" index="y"/> + <property name="arguments" type="ftable" access="RC"/> - <instElement name="msgMatched" type="count64"/> + <statistic name="msgMatched" type="count64"/> </class> <!-- @@ -203,17 +201,17 @@ Client =============================================================== --> - <class name="client"> - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="address" type="sstr" access="RC" index="y"/> - <configElement name="incoming" type="bool" access="RC"/> - - <instElement name="closing" type="bool" desc="This client is closing by management request"/> - <instElement name="authIdentity" type="sstr"/> - <instElement name="framesFromClient" type="count64"/> - <instElement name="framesToClient" type="count64"/> - <instElement name="bytesFromClient" type="count64"/> - <instElement name="bytesToClient" type="count64"/> + <class name="Connection"> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="address" type="sstr" access="RC" index="y"/> + <property name="incoming" type="bool" access="RC"/> + + <statistic name="closing" type="bool" desc="This client is closing by management request"/> + <statistic name="authIdentity" type="sstr"/> + <statistic name="framesFromClient" type="count64"/> + <statistic name="framesToClient" type="count64"/> + <statistic name="bytesFromClient" type="count64"/> + <statistic name="bytesToClient" type="count64"/> <method name="close"/> </class> @@ -223,30 +221,30 @@ Link =============================================================== --> - <class name="link"> + <class name="Link"> This class represents an inter-broker connection. - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="host" type="sstr" access="RC" index="y"/> - <configElement name="port" type="uint16" access="RC" index="y"/> - <configElement name="useSsl" type="bool" access="RC"/> - <configElement name="durable" type="bool" access="RC"/> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="host" type="sstr" access="RC" index="y"/> + <property name="port" type="uint16" access="RC" index="y"/> + <property name="useSsl" type="bool" access="RC"/> + <property name="durable" type="bool" access="RC"/> - <instElement name="state" type="sstr" desc="Operational state of the link"/> - <instElement name="lastError" type="sstr" desc="Reason link is not operational"/> + <statistic name="state" type="sstr" desc="Operational state of the link"/> + <statistic name="lastError" type="sstr" desc="Reason link is not operational"/> <method name="close"/> <method name="bridge" desc="Bridge messages over the link"> - <arg name="durable" dir="I" type="bool"/> - <arg name="src" dir="I" type="sstr"/> - <arg name="dest" dir="I" type="sstr"/> - <arg name="key" dir="I" type="sstr" default=""/> - <arg name="tag" dir="I" type="sstr" default=""/> - <arg name="excludes" dir="I" type="sstr" default=""/> - <arg name="src_is_queue" dir="I" type="bool" default="0"/> - <arg name="src_is_local" dir="I" type="bool" default="0"/> + <arg name="durable" dir="I" type="bool"/> + <arg name="src" dir="I" type="sstr"/> + <arg name="dest" dir="I" type="sstr"/> + <arg name="key" dir="I" type="sstr" default=""/> + <arg name="tag" dir="I" type="sstr" default=""/> + <arg name="excludes" dir="I" type="sstr" default=""/> + <arg name="srcIsQueue" dir="I" type="bool" default="0"/> + <arg name="srcIsLocal" dir="I" type="bool" default="0"/> </method> </class> @@ -256,17 +254,17 @@ Bridge =============================================================== --> - <class name="bridge"> - <configElement name="linkRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="channelId" type="uint16" access="RC" index="y"/> - <configElement name="durable" type="bool" access="RC"/> - <configElement name="src" type="sstr" access="RC"/> - <configElement name="dest" type="sstr" access="RC"/> - <configElement name="key" type="sstr" access="RC"/> - <configElement name="src_is_queue" type="bool" access="RC"/> - <configElement name="src_is_local" type="bool" access="RC"/> - <configElement name="tag" type="sstr" access="RC"/> - <configElement name="excludes" type="sstr" access="RC"/> + <class name="Bridge"> + <property name="linkRef" type="objId" references="Link" access="RC" index="y" parentRef="y"/> + <property name="channelId" type="uint16" access="RC" index="y"/> + <property name="durable" type="bool" access="RC"/> + <property name="src" type="sstr" access="RC"/> + <property name="dest" type="sstr" access="RC"/> + <property name="key" type="sstr" access="RC"/> + <property name="srcIsQueue" type="bool" access="RC"/> + <property name="srcIsLocal" type="bool" access="RC"/> + <property name="tag" type="sstr" access="RC"/> + <property name="excludes" type="sstr" access="RC"/> <method name="close"/> </class> @@ -276,72 +274,21 @@ Session =============================================================== --> - <class name="session"> - <configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="name" type="sstr" access="RC" index="y"/> - <configElement name="channelId" type="uint16" access="RO"/> - <configElement name="clientRef" type="objId" access="RO"/> - <configElement name="detachedLifespan" type="uint32" access="RO" unit="second"/> + <class name="Session"> + <property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="channelId" type="uint16" access="RO"/> + <property name="connectionRef" type="objId" references="Connection" access="RO"/> + <property name="detachedLifespan" type="uint32" access="RO" unit="second"/> - <instElement name="attached" type="bool"/> - <instElement name="expireTime" type="absTime"/> - <instElement name="framesOutstanding" type="count32"/> + <statistic name="attached" type="bool"/> + <statistic name="expireTime" type="absTime"/> + <statistic name="framesOutstanding" type="count32"/> <method name="solicitAck"/> <method name="detach"/> <method name="resetLifespan"/> <method name="close"/> </class> - - <!-- - =============================================================== - Destination - =============================================================== - --> - <class name="destination"> - <configElement name="sessionRef" type="objId" access="RC" index="y" parentRef="y"/> - <configElement name="name" type="sstr" access="RC" index="y"/> - - <instElement name="flowMode" type="uint8"/> - <instElement name="maxMsgCredits" type="uint32"/> - <instElement name="maxByteCredits" type="uint32"/> - <instElement name="msgCredits" type="uint32"/> - <instElement name="byteCredits" type="uint32"/> - - <method name="throttle" desc="Apply extra rate limiting to destination: 0 = Normal, 10 = Maximum"> - <arg name="strength" type="uint8" dir="I" min="0" max="10"/> - </method> - <method name="stop"/> - <method name="start"/> - </class> - - <!-- - =============================================================== - Producer - =============================================================== - --> - <class name="producer"> - <configElement name="destinationRef" access="RC" type="objId" index="y"/> - <configElement name="exchangeRef" access="RC" type="objId" index="y"/> - - <instElement name="msgsProduced" type="count64"/> - <instElement name="bytesProduced" type="count64"/> - </class> - - <!-- - =============================================================== - Consumer - =============================================================== - --> - <class name="consumer"> - <configElement name="destinationRef" access="RC" type="objId" index="y"/> - <configElement name="queueRef" access="RC" type="objId" index="y"/> - - <instElement name="msgsConsumed" type="count64"/> - <instElement name="bytesConsumed" type="count64"/> - <instElement name="unackedMessages" type="hilo32" desc="Messages consumed but not yet acked"/> - - <method name="close"/> - </class> </schema> diff --git a/specs/management-types.xml b/specs/management-types.xml index b3e08a612f..aad6b348c3 100644 --- a/specs/management-types.xml +++ b/specs/management-types.xml @@ -44,6 +44,8 @@ <type name="count32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" accessor="counter" init="0"/> <type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/> +<type name="atomic32" base="U32" cpp="qpid::sys::AtomicCount" encode="@.putLong(#)" decode="" accessor="counterByOne"/> + <!-- Min/Max/Average statistics --> <type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/> <type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/> |
