diff options
| author | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-15 15:51:15 +0000 |
| commit | d5913038788795bd964c534bd28983e5732c2fce (patch) | |
| tree | b86ab7ab84ec2f25421c161e76ef2924043ca30a /cpp/src/qpid/agent | |
| parent | e173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff) | |
| download | qpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz | |
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 81 |
1 files changed, 53 insertions, 28 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 2f7a524a65..36e56e48ae 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing() moveNewObjectsLH(); - if (clientWasAdded) - { + if (clientWasAdded) { clientWasAdded = false; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); - iter++) - { + iter++) { ManagementObject* object = iter->second; object->setAllChanged(); } @@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing() if (managementObjects.empty()) return; - + + // + // Clear the been-here flag on all objects in the map. + // for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) - { - ManagementObject* object = iter->second; + iter->second->setFlags(0); + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->isDeleted())) + continue; - if (object->getConfigChanged() || object->isDeleted()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); - } + if (object->getConfigChanged() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } - if (object->getInstChanged()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + if (object->getInstChanged()) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(iter->first); - contentSize = BUFSIZE - msgBuffer.available(); + if (msgBuffer.available() < (BUFSIZE / 2)) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { msgBuffer.reset(); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName(); + routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName(); connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - - if (object->isDeleted()) - deleteList.push_back(iter->first); } // Delete flagged objects @@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); msg.setData(data); - session.messageTransfer(arg::content=msg, arg::destination=exchange); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(std::exception&) {} } void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) |
