diff options
author | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
commit | d5913038788795bd964c534bd28983e5732c2fce (patch) | |
tree | b86ab7ab84ec2f25421c161e76ef2924043ca30a /cpp/src | |
parent | e173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff) | |
download | qpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz |
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 81 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 10 |
3 files changed, 80 insertions, 43 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 2f7a524a65..36e56e48ae 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing() moveNewObjectsLH(); - if (clientWasAdded) - { + if (clientWasAdded) { clientWasAdded = false; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); - iter++) - { + iter++) { ManagementObject* object = iter->second; object->setAllChanged(); } @@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing() if (managementObjects.empty()) return; - + + // + // Clear the been-here flag on all objects in the map. + // for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) - { - ManagementObject* object = iter->second; + iter->second->setFlags(0); + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->isDeleted())) + continue; - if (object->getConfigChanged() || object->isDeleted()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); - } + if (object->getConfigChanged() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } - if (object->getInstChanged()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + if (object->getInstChanged()) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(iter->first); - contentSize = BUFSIZE - msgBuffer.available(); + if (msgBuffer.available() < (BUFSIZE / 2)) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { msgBuffer.reset(); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName(); + routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName(); connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - - if (object->isDeleted()) - deleteList.push_back(iter->first); } // Delete flagged objects @@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); msg.setData(data); - session.messageTransfer(arg::content=msg, arg::destination=exchange); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(std::exception&) {} } void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 4a9882d827..7ced42f69b 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -291,26 +291,26 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::sendBuffer (Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) +void ManagementBroker::sendBuffer(Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) { if (exchange.get() == 0) return; - intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageTransferBody>( + intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>( ProtocolVersion(), exchange->getName (), 0, 0)); - AMQFrame header (in_place<AMQHeaderBody>()); + AMQFrame header(in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); content.castBody<AMQContentBody>()->decode(buf, length); - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); msg->getFrames().append(method); msg->getFrames().append(header); @@ -321,7 +321,9 @@ void ManagementBroker::sendBuffer (Buffer& buf, msg->getFrames().append(content); DeliverableMessage deliverable (msg); - exchange->route (deliverable, routingKey, 0); + try { + exchange->route(deliverable, routingKey, 0); + } catch(std::exception&) {} } void ManagementBroker::moveNewObjectsLH() @@ -385,7 +387,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -397,7 +399,7 @@ void ManagementBroker::periodicProcessing (void) contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName (); + routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName (); sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } @@ -1018,7 +1020,7 @@ void ManagementBroker::addClassLH(uint8_t kind, return; // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << + QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" << key.name); cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index fa2025112f..a34f50ab8f 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -122,6 +122,7 @@ class ManagementObject : public ManagementItem sys::Mutex accessLock; ManagementAgent* agent; int maxThreads; + uint32_t flags; static int nextThreadIndex; @@ -164,6 +165,15 @@ class ManagementObject : public ManagementItem deleted = true; } inline bool isDeleted (void) { return deleted; } + inline void setFlags(uint32_t f) { flags = f; } + inline uint32_t getFlags() { return flags; } + bool isSameClass(ManagementObject& other) { + for (int idx = 0; idx < 16; idx++) + if (other.getMd5Sum()[idx] != getMd5Sum()[idx]) + return false; + return other.getClassName() == getClassName() && + other.getPackageName() == getPackageName(); + } }; typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap; |