From a3aaa263858f07d37e5860136300f76fab8d7ecd Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 23 Nov 2007 13:37:42 +0000 Subject: QPID-689 from tross@redhat.com. This patch introduces formal schema specification for management and code generation for management classes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@597662 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 37 +++++++++++++++++++--- cpp/src/qpid/broker/Queue.cpp | 69 ++++++++++++++++++++++++++++-------------- cpp/src/qpid/broker/Vhost.cpp | 2 +- 3 files changed, 80 insertions(+), 28 deletions(-) (limited to 'cpp/src/qpid/broker') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d61100d255..4e22cb7352 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -19,6 +19,7 @@ * */ +#include "config.h" #include "Broker.h" #include "Connection.h" #include "DirectExchange.h" @@ -124,7 +125,19 @@ Broker::Broker(const Broker::Options& conf) : managementAgent = ManagementAgent::getAgent (); managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, 0, 0, conf.port)); + mgmtObject->set_workerThreads (conf.workerThreads); + mgmtObject->set_maxConns (conf.maxConnections); + mgmtObject->set_connBacklog (conf.connectionBacklog); + mgmtObject->set_stagingThreshold (conf.stagingThreshold); + mgmtObject->set_storeLib (conf.store); + mgmtObject->set_asyncStore (conf.storeAsync); + mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval); + mgmtObject->set_initialDiskPageSize (0); + mgmtObject->set_initialPagesPerQueue (0); + mgmtObject->set_clusterName (""); + mgmtObject->set_version (PACKAGE_VERSION); + managementAgent->addObject (mgmtObject); // Since there is currently no support for virtual hosts, a placeholder object @@ -248,11 +261,27 @@ ManagementObject::shared_ptr Broker::GetManagementObject(void) const return dynamic_pointer_cast (mgmtObject); } -Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/, +Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& /*_args*/) { - QPID_LOG (debug, "Broker::ManagementMethod"); - return Manageable::STATUS_OK; + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]"); + + switch (methodId) + { + case management::Broker::METHOD_ECHO : + status = Manageable::STATUS_OK; + break; + + case management::Broker::METHOD_JOINCLUSTER : + case management::Broker::METHOD_LEAVECLUSTER : + case management::Broker::METHOD_CRASH : + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 41a5767457..376b9367d0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (parent != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject); @@ -92,11 +92,21 @@ void Queue::deliver(intrusive_ptr& msg){ if (!enqueue(0, msg)){ push(msg); msg->enqueueComplete(); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize ()); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } }else { - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -108,8 +118,15 @@ void Queue::deliver(intrusive_ptr& msg){ void Queue::recover(intrusive_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + } + if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -118,15 +135,19 @@ void Queue::recover(intrusive_ptr& msg){ } void Queue::process(intrusive_ptr& msg){ - - uint32_t mask = management::MSG_MASK_TX; - - if (msg->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - push(msg); - if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), mask); + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + mgmtObject->inc_msgTxnEnqueues (); + mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + mgmtObject->inc_msgDepth (); + mgmtObject->inc_byteDepth (msg->contentSize ()); + if (msg->isPersistent ()) { + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } + } serializer.execute(dispatchCallback); } @@ -309,7 +330,7 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){ } if (mgmtObject != 0){ - mgmtObject->incConsumers (); + mgmtObject->inc_consumers (); } } @@ -321,7 +342,7 @@ void Queue::cancel(Consumer::ptr c){ cancel(c, browsers); } if (mgmtObject != 0){ - mgmtObject->decConsumers (); + mgmtObject->dec_consumers (); } if(exclusive == c) exclusive.reset(); } @@ -341,12 +362,14 @@ QueuedMessage Queue::dequeue(){ msg = messages.front(); pop(); if (mgmtObject != 0){ - uint32_t mask = 0; - - if (msg.payload->isPersistent ()) - mask |= management::MSG_MASK_PERSIST; - - mgmtObject->dequeue (msg.payload->contentSize (), mask); + mgmtObject->inc_msgTotalDequeues (); + //mgmtObject->inc_byteTotalDequeues (msg->contentSize ()); + mgmtObject->dec_msgDepth (); + //mgmtObject->dec_byteDepth (msg->contentSize ()); + if (0){//msg->isPersistent ()) { + mgmtObject->inc_msgPersistDequeues (); + //mgmtObject->inc_bytePersistDequeues (msg->contentSize ()); + } } } return msg; diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index bf0521904f..635f345a86 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -28,7 +28,7 @@ Vhost::Vhost (management::Manageable* parentBroker) if (parentBroker != 0) { mgmtObject = management::Vhost::shared_ptr - (new management::Vhost (this, parentBroker)); + (new management::Vhost (this, parentBroker, "/")); ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); agent->addObject (mgmtObject); -- cgit v1.2.1