diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 233 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 8 |
2 files changed, 138 insertions, 103 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 351e0bfd00..d431e4ca16 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -88,6 +88,8 @@ ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), + topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"), + schemaTimestamp(Duration(EPOCH, now())), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -296,7 +298,8 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), - event.getMd5Sum()); + event.getMd5Sum(), + ManagementItem::CLASS_KIND_EVENT); event.mapEncode(values); map_["_values"] = values; map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); @@ -308,7 +311,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["qmf.agent"] = name_address; MapCodec::encode(map_, content); - connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str()); + connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -404,7 +407,6 @@ void ManagementAgentImpl::retrieveData() void ManagementAgentImpl::sendHeartbeat() { - static const string addr_exchange("qmf.default.topic"); static const string addr_key_base("agent.ind.heartbeat"); Variant::Map map; @@ -429,13 +431,9 @@ void ManagementAgentImpl::sendHeartbeat() headers["qmf.opcode"] = "_agent_heartbeat_indication"; headers["qmf.agent"] = name_address; - map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - + getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str()); + connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str()); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } @@ -443,8 +441,6 @@ void ManagementAgentImpl::sendHeartbeat() void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, const string& text, uint32_t code) { - static const string addr_exchange("qmf.default.direct"); - Variant::Map map; Variant::Map headers; Variant::Map values; @@ -459,7 +455,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string& map["_values"] = values; MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey); QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } @@ -573,7 +569,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& headers["qmf.opcode"] = "_method_response"; QPID_LOG(trace, "SENT MethodResponse map=" << outMap); MapCodec::encode(outMap, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); } } @@ -590,7 +586,6 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; headers["partial"] = Variant(); @@ -614,80 +609,25 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, return; } - if (i->second.asString() != "OBJECT") { - sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); - return; - } - - string className; - string packageName; - - /* - * Handle the _schema_id element, if supplied. - */ - i = inMap.find("_schema_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - const Variant::Map& schemaIdMap(i->second.asMap()); - - Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - className = s_iter->second.asString(); - - s_iter = schemaIdMap.find("_package_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - packageName = s_iter->second.asString(); - } - - /* - * Unpack the _object_id element of the query if it is present. If it is present, find that one - * object and return it. If it is not present, send a class-based result. - */ - i = inMap.find("_object_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - ObjectId objId(i->second.asMap()); - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - object->mapEncodeValues(values, true, true); // write both stats and properties - objId.mapEncode(oidMap); - map_["_values"] = values; - map_["_object_id"] = oidMap; - object->writeTimestamps(map_); - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - object->getMd5Sum()); - - list_.push_back(map_); - headers.erase("partial"); - - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); - return; - } - } else { - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className && - (packageName.empty() || object->getPackageName() == packageName)) { + if (i->second.asString() == "OBJECT") { + headers["qmf.content"] = "_data"; + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); - // @todo support multiple object reply per message - values.clear(); - list_.clear(); - oidMap.clear(); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); + objId.mapEncode(oidMap); map_["_values"] = values; map_["_object_id"] = oidMap; object->writeTimestamps(map_); @@ -695,26 +635,101 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, object->getClassName(), object->getMd5Sum()); list_.push_back(map_); + headers.erase("partial"); ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; + } + } else { // match using schema_id, if supplied + + string className; + string packageName; + + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); + + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + oidMap.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + object->writeTimestamps(map_); + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + } + } } } - } - // Send empty "non-partial" message to indicate CommandComplete - list_.clear(); - headers.erase("partial"); - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + // Send empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + + } else if (i->second.asString() == "SCHEMA_ID") { + headers["qmf.content"] = "_schema_id"; + /** + * @todo - support for a predicate. For now, send a list of all known schema class keys. + */ + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); pIter++) { + for (ClassMap::iterator cIter = pIter->second.begin(); + cIter != pIter->second.end(); cIter++) { + + list_.push_back(mapEncodeSchemaId( pIter->first, + cIter->first.name, + cIter->first.hash, + cIter->second.kind )); + } + } + + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo); + + } else { + // Unknown query target + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + } } void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) { QPID_LOG(trace, "RCVD AgentLocateRequest"); - static const string addr_exchange("qmf.default.direct"); Variant::Map map; Variant::Map headers; @@ -724,13 +739,9 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, headers["qmf.opcode"] = "_agent_locate_response"; headers["qmf.agent"] = name_address; - map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - + getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); @@ -822,13 +833,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, const string& cname, - const uint8_t *md5Sum) + const uint8_t *md5Sum, + uint8_t type) { Variant::Map map_; map_["_package_name"] = pname; map_["_class_name"] = cname; map_["_hash"] = types::Uuid(md5Sum); + if (type == ManagementItem::CLASS_KIND_EVENT) + map_["_type"] = "_event"; + else + map_["_type"] = "_data"; + return map_; } @@ -901,6 +918,8 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, // No such class found, create a new class with local information. cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); + schemaTimestamp = Duration(EPOCH, now()); + QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp)); } void ManagementAgentImpl::encodePackageIndication(Buffer& buf, @@ -1014,7 +1033,7 @@ void ManagementAgentImpl::periodicProcessing() headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + connThreadBody.sendBuffer(content, "", headers, topicExchange, "agent.ind.data", "amqp/list"); QPID_LOG(trace, "SENT DataIndication"); } } @@ -1031,6 +1050,16 @@ void ManagementAgentImpl::periodicProcessing() sendHeartbeat(); } + +void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map) +{ + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp); +} + void ManagementAgentImpl::ConnectionThread::run() { static const int delayMin(1); @@ -1055,9 +1084,9 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); - session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(), arg::bindingKey=agent.name_address); - session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(), arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 4a58807e98..32cbbd7e08 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -32,6 +32,7 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/PipeHandle.h" +#include "qpid/sys/Time.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -166,6 +167,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen bool connected; bool useMapMsg; std::string lastFailure; + std::string topicExchange; + std::string directExchange; + qpid::sys::Duration schemaTimestamp; bool clientWasAdded; uint32_t requestedBrokerBank; @@ -258,7 +262,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, const std::string& cname, - const uint8_t *md5Sum); + const uint8_t *md5Sum, + uint8_t type=ManagementItem::CLASS_KIND_TABLE); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendHeartbeat(); void sendException(const std::string& replyToKey, const std::string& cid, @@ -272,6 +277,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); + void getHeartbeatContent (qpid::types::Variant::Map& map); }; }} |