summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/agent
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
committerTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
commitd5913038788795bd964c534bd28983e5732c2fce (patch)
treeb86ab7ab84ec2f25421c161e76ef2924043ca30a /cpp/src/qpid/agent
parente173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff)
downloadqpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp81
1 files changed, 53 insertions, 28 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 2f7a524a65..36e56e48ae 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing()
moveNewObjectsLH();
- if (clientWasAdded)
- {
+ if (clientWasAdded) {
clientWasAdded = false;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
- iter++)
- {
+ iter++) {
ManagementObject* object = iter->second;
object->setAllChanged();
}
@@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing()
if (managementObjects.empty())
return;
-
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++)
- {
- ManagementObject* object = iter->second;
+ iter->second->setFlags(0);
+
+ //
+ // Process the entire object map.
+ //
+ for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second;
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->isDeleted()))
+ continue;
- if (object->getConfigChanged() || object->isDeleted())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ for (ManagementObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
- }
+ if (object->getConfigChanged() || object->isDeleted()) {
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ }
- if (object->getInstChanged())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
+ if (object->getInstChanged()) {
+ encodeHeader(msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
- contentSize = BUFSIZE - msgBuffer.available();
+ if (msgBuffer.available() < (BUFSIZE / 2))
+ break;
+ }
+ }
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
msgBuffer.reset();
- routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName();
+ routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName();
connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
-
- if (object->isDeleted())
- deleteList.push_back(iter->first);
}
// Delete flagged objects
@@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
msg.setData(data);
- session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(std::exception&) {}
}
void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)