summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
committerTed Ross <tross@apache.org>2008-10-15 15:51:15 +0000
commitd5913038788795bd964c534bd28983e5732c2fce (patch)
treeb86ab7ab84ec2f25421c161e76ef2924043ca30a /cpp/src
parente173cf8c8bd0af424a2d087f02dfa83fcbf7029d (diff)
downloadqpid-python-d5913038788795bd964c534bd28983e5732c2fce.tar.gz
QPID-1360 - Scaling improvements for QMF
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704944 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp81
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp32
-rw-r--r--cpp/src/qpid/management/ManagementObject.h10
3 files changed, 80 insertions, 43 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 2f7a524a65..36e56e48ae 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -606,13 +606,11 @@ void ManagementAgentImpl::periodicProcessing()
moveNewObjectsLH();
- if (clientWasAdded)
- {
+ if (clientWasAdded) {
clientWasAdded = false;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
- iter++)
- {
+ iter++) {
ManagementObject* object = iter->second;
object->setAllChanged();
}
@@ -620,39 +618,64 @@ void ManagementAgentImpl::periodicProcessing()
if (managementObjects.empty())
return;
-
+
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++)
- {
- ManagementObject* object = iter->second;
+ iter->second->setFlags(0);
+
+ //
+ // Process the entire object map.
+ //
+ for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second;
+
+ //
+ // Skip until we find a base object requiring a sent message.
+ //
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->isDeleted()))
+ continue;
- if (object->getConfigChanged() || object->isDeleted())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ for (ManagementObjectMap::iterator iter = baseIter;
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
- }
+ if (object->getConfigChanged() || object->isDeleted()) {
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(msgBuffer);
+ }
- if (object->getInstChanged())
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
+ if (object->getInstChanged()) {
+ encodeHeader(msgBuffer, 'i');
+ object->writeStatistics(msgBuffer);
+ }
+
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
- contentSize = BUFSIZE - msgBuffer.available();
+ if (msgBuffer.available() < (BUFSIZE / 2))
+ break;
+ }
+ }
+
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
msgBuffer.reset();
- routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName();
+ routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName();
connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
-
- if (object->isDeleted())
- deleteList.push_back(iter->first);
}
// Delete flagged objects
@@ -737,7 +760,9 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
msg.setData(data);
- session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ try {
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+ } catch(std::exception&) {}
}
void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 4a9882d827..7ced42f69b 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -291,26 +291,26 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::sendBuffer (Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+void ManagementBroker::sendBuffer(Buffer& buf,
+ uint32_t length,
+ qpid::broker::Exchange::shared_ptr exchange,
+ string routingKey)
{
if (exchange.get() == 0)
return;
- intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(
ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header (in_place<AMQHeaderBody>());
+ AMQFrame header(in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
content.castBody<AMQContentBody>()->decode(buf, length);
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
msg->getFrames().append(method);
msg->getFrames().append(header);
@@ -321,7 +321,9 @@ void ManagementBroker::sendBuffer (Buffer& buf,
msg->getFrames().append(content);
DeliverableMessage deliverable (msg);
- exchange->route (deliverable, routingKey, 0);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(std::exception&) {}
}
void ManagementBroker::moveNewObjectsLH()
@@ -385,7 +387,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.prop." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -397,7 +399,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.stat." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -1018,7 +1020,7 @@ void ManagementBroker::addClassLH(uint8_t kind,
return;
// No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
+ QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" <<
key.name);
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index fa2025112f..a34f50ab8f 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -122,6 +122,7 @@ class ManagementObject : public ManagementItem
sys::Mutex accessLock;
ManagementAgent* agent;
int maxThreads;
+ uint32_t flags;
static int nextThreadIndex;
@@ -164,6 +165,15 @@ class ManagementObject : public ManagementItem
deleted = true;
}
inline bool isDeleted (void) { return deleted; }
+ inline void setFlags(uint32_t f) { flags = f; }
+ inline uint32_t getFlags() { return flags; }
+ bool isSameClass(ManagementObject& other) {
+ for (int idx = 0; idx < 16; idx++)
+ if (other.getMd5Sum()[idx] != getMd5Sum()[idx])
+ return false;
+ return other.getClassName() == getClassName() &&
+ other.getPackageName() == getPackageName();
+ }
};
typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;