summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-08-12 19:59:19 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-08-12 19:59:19 +0000
commit53037e59d4bc9d83d841d63f9bc7e2b1b72fa232 (patch)
tree3bf04c4eba3754f6505682a5d94875d6f4f2a7de /cpp
parent0539fb89df2c2822f14bcc473911c6ad3aa0e9a5 (diff)
downloadqpid-python-53037e59d4bc9d83d841d63f9bc7e2b1b72fa232.tar.gz
QPID-2791: batch up data indications and replies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@984935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp68
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h4
2 files changed, 47 insertions, 25 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 50301f77c8..15e17e13f3 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -106,6 +106,7 @@ ManagementAgentImpl::ManagementAgentImpl() :
schemaTimestamp(Duration(EPOCH, now())),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
+ maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter
connThreadBody(*this), connThread(connThreadBody),
pubThreadBody(*this), pubThread(pubThreadBody)
{
@@ -677,6 +678,7 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
packageName = s_iter->second.asString();
+ unsigned int objCount = 0;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
iter++) {
@@ -684,10 +686,9 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
if (object->getClassName() == className &&
(packageName.empty() || object->getPackageName() == packageName)) {
- // @todo support multiple object reply per message
values.clear();
- list_.clear();
oidMap.clear();
+ map_.clear();
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -702,16 +703,20 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
object->getMd5Sum());
list_.push_back(map_);
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ if (++objCount >= maxV2ReplyObjs) {
+ objCount = 0;
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ content.clear();
+ list_.clear();
+ }
}
}
}
}
- // Send empty "non-partial" message to indicate CommandComplete
- list_.clear();
+ // Send last "non-partial" message to indicate CommandComplete
headers.erase("partial");
ListCodec::encode(list_, content);
connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
@@ -971,6 +976,8 @@ void ManagementAgentImpl::periodicProcessing()
if (!connected)
return;
+ sendHeartbeat();
+
moveNewObjectsLH();
//
@@ -991,6 +998,8 @@ void ManagementAgentImpl::periodicProcessing()
//
// Process the entire object map.
//
+ uint32_t v2Objs = 0;
+
for (ManagementObjectMap::iterator baseIter = managementObjects.begin();
baseIter != managementObjects.end();
baseIter++) {
@@ -1010,6 +1019,21 @@ void ManagementAgentImpl::periodicProcessing()
std::string className = baseObject->getClassName();
Variant::List list_;
+ string content;
+ std::stringstream addr_key;
+ Variant::Map headers;
+
+ addr_key << addr_key_base;
+ addr_key << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
@@ -1038,6 +1062,16 @@ void ManagementAgentImpl::periodicProcessing()
object->mapEncodeValues(values, send_props, send_stats);
map_["_values"] = values;
list_.push_back(map_);
+
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+ ListCodec::encode(list_, content);
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
+ list_.clear();
+ content.clear();
+ QPID_LOG(trace, "SENT DataIndication");
+ }
}
if (object->isDeleted())
@@ -1046,23 +1080,8 @@ void ManagementAgentImpl::periodicProcessing()
}
}
- string content;
- ListCodec::encode(list_, content);
- if (content.length()) {
- Variant::Map headers;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- std::stringstream addr_key;
- addr_key << addr_key_base;
- addr_key << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
+ if (!list_.empty()) {
+ ListCodec::encode(list_, content);
connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
QPID_LOG(trace, "SENT DataIndication");
}
@@ -1077,7 +1096,6 @@ void ManagementAgentImpl::periodicProcessing()
}
deleteList.clear();
- sendHeartbeat();
}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 7086e67459..477526c882 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -181,6 +181,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint32_t assignedAgentBank;
uint16_t bootSequence;
+ // Maximum # of objects allowed in a single V2 response
+ // message.
+ uint32_t maxV2ReplyObjs;
+
static const uint8_t DEBUG_OFF = 0;
static const uint8_t DEBUG_CONN = 1;
static const uint8_t DEBUG_PROTO = 2;