From d5913038788795bd964c534bd28983e5732c2fce Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 15 Oct 2008 15:51:15 +0000 Subject: QPID-1360 - Scaling improvements for QMF git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/agent/ManagementAgentImpl.cpp | 81 +++++++++++++++++++----------- 1 file changed, 53 insertions(+), 28 deletions(-) (limited to 'cpp/src/qpid/agent') diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 2f7a524a65..36e56e48ae 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing() moveNewObjectsLH(); - if (clientWasAdded) - { + if (clientWasAdded) { clientWasAdded = false; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); - iter++) - { + iter++) { ManagementObject* object = iter->second; object->setAllChanged(); } @@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing() if (managementObjects.empty()) return; - + + // + // Clear the been-here flag on all objects in the map. + // for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) - { - ManagementObject* object = iter->second; + iter->second->setFlags(0); + + // + // Process the entire object map. + // + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + + // + // Skip until we find a base object requiring a sent message. + // + if (baseObject->getFlags() == 1 || + (!baseObject->getConfigChanged() && + !baseObject->getInstChanged() && + !baseObject->isDeleted())) + continue; - if (object->getConfigChanged() || object->isDeleted()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); + Buffer msgBuffer(msgChars, BUFSIZE); + for (ManagementObjectMap::iterator iter = baseIter; + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (baseObject->isSameClass(*object) && object->getFlags() == 0) { + object->setFlags(1); - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); - } + if (object->getConfigChanged() || object->isDeleted()) { + encodeHeader(msgBuffer, 'c'); + object->writeProperties(msgBuffer); + } - if (object->getInstChanged()) - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + if (object->getInstChanged()) { + encodeHeader(msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + } + + if (object->isDeleted()) + deleteList.push_back(iter->first); - contentSize = BUFSIZE - msgBuffer.available(); + if (msgBuffer.available() < (BUFSIZE / 2)) + break; + } + } + + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { msgBuffer.reset(); - routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName(); + routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName(); connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - - if (object->isDeleted()) - deleteList.push_back(iter->first); } // Delete flagged objects @@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); msg.setData(data); - session.messageTransfer(arg::content=msg, arg::destination=exchange); + try { + session.messageTransfer(arg::content=msg, arg::destination=exchange); + } catch(std::exception&) {} } void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) -- cgit v1.2.1