diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-12 19:59:19 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-08-12 19:59:19 +0000 |
commit | 53037e59d4bc9d83d841d63f9bc7e2b1b72fa232 (patch) | |
tree | 3bf04c4eba3754f6505682a5d94875d6f4f2a7de /cpp | |
parent | 0539fb89df2c2822f14bcc473911c6ad3aa0e9a5 (diff) | |
download | qpid-python-53037e59d4bc9d83d841d63f9bc7e2b1b72fa232.tar.gz |
QPID-2791: batch up data indications and replies
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@984935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 68 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 4 |
2 files changed, 47 insertions, 25 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 50301f77c8..15e17e13f3 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -106,6 +106,7 @@ ManagementAgentImpl::ManagementAgentImpl() : schemaTimestamp(Duration(EPOCH, now())), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), + maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter connThreadBody(*this), connThread(connThreadBody), pubThreadBody(*this), pubThread(pubThreadBody) { @@ -677,6 +678,7 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) packageName = s_iter->second.asString(); + unsigned int objCount = 0; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { @@ -684,10 +686,9 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, if (object->getClassName() == className && (packageName.empty() || object->getPackageName() == packageName)) { - // @todo support multiple object reply per message values.clear(); - list_.clear(); oidMap.clear(); + map_.clear(); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -702,16 +703,20 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, object->getMd5Sum()); list_.push_back(map_); - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + if (++objCount >= maxV2ReplyObjs) { + objCount = 0; + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + content.clear(); + list_.clear(); + } } } } } - // Send empty "non-partial" message to indicate CommandComplete - list_.clear(); + // Send last "non-partial" message to indicate CommandComplete headers.erase("partial"); ListCodec::encode(list_, content); connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); @@ -971,6 +976,8 @@ void ManagementAgentImpl::periodicProcessing() if (!connected) return; + sendHeartbeat(); + moveNewObjectsLH(); // @@ -991,6 +998,8 @@ void ManagementAgentImpl::periodicProcessing() // // Process the entire object map. // + uint32_t v2Objs = 0; + for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); baseIter != managementObjects.end(); baseIter++) { @@ -1010,6 +1019,21 @@ void ManagementAgentImpl::periodicProcessing() std::string className = baseObject->getClassName(); Variant::List list_; + string content; + std::stringstream addr_key; + Variant::Map headers; + + addr_key << addr_key_base; + addr_key << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey + << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); @@ -1038,6 +1062,16 @@ void ManagementAgentImpl::periodicProcessing() object->mapEncodeValues(values, send_props, send_stats); map_["_values"] = values; list_.push_back(map_); + + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + ListCodec::encode(list_, content); + + connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list"); + list_.clear(); + content.clear(); + QPID_LOG(trace, "SENT DataIndication"); + } } if (object->isDeleted()) @@ -1046,23 +1080,8 @@ void ManagementAgentImpl::periodicProcessing() } } - string content; - ListCodec::encode(list_, content); - if (content.length()) { - Variant::Map headers; - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - std::stringstream addr_key; - addr_key << addr_key_base; - addr_key << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey - << "." << instanceNameKey; - + if (!list_.empty()) { + ListCodec::encode(list_, content); connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list"); QPID_LOG(trace, "SENT DataIndication"); } @@ -1077,7 +1096,6 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - sendHeartbeat(); } diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 7086e67459..477526c882 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -181,6 +181,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint32_t assignedAgentBank; uint16_t bootSequence; + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxV2ReplyObjs; + static const uint8_t DEBUG_OFF = 0; static const uint8_t DEBUG_CONN = 1; static const uint8_t DEBUG_PROTO = 2; |