diff options
| author | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
| commit | 2e29faa768283390452b7d432db28d43cd4a27aa (patch) | |
| tree | 521e9711b340a330408245ba699b35d12b36ce9c /cpp/src/qpid/agent | |
| parent | 5e50981ac8a35db09723ad19f5994703d00e10d9 (diff) | |
| download | qpid-python-2e29faa768283390452b7d432db28d43cd4a27aa.tar.gz | |
Merged the changes from the qmf-devel0.7a branch back to the trunk.
This is a checkpoint along the QMFv2 development path.
This update introduces portions of QMFv2 into the code:
- The C++ agent (qpid/agent) uses QMFv2 for data and method transfer
o The APIs no longer use qpid::framing::*
o Consequently, boost is no longer referenced from the API headers.
o Agents and Objects are now referenced by strings, not numbers.
o Schema transfer still uses the QMFv1 format.
- The broker-resident agent can use QMFv1 or QMFv2 based on the command line options.
It defaults to QMFv1 for compatibility.
- The pure-python QMF console (qmf.console) can concurrently interact with both
QMFv1 and QMFv2 agents.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 673 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 42 |
2 files changed, 475 insertions, 240 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 637645bb04..42bc36c4b8 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" #include "qpid/agent/ManagementAgentImpl.h" -#include "qpid/sys/Mutex.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <string.h> #include <stdlib.h> @@ -41,6 +41,9 @@ using std::ofstream; using std::ifstream; using std::string; using std::endl; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; namespace { Mutex lock; @@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), - initialized(false), connected(false), lastFailure("never connected"), + initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImpl() } } +void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + inst = qpid::types::Uuid(true).str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + void ManagementAgentImpl::init(const string& brokerHost, uint16_t brokerPort, uint16_t intervalSeconds, @@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const string& brokerHost, void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, uint16_t intervalSeconds, bool useExternalThread, - const std::string& _storeFile) + const string& _storeFile) { interval = intervalSeconds; extThread = useExternalThread; @@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, bootSequence = 1; storeData(true); + if (attrMap.empty()) + setName("vendor", "product"); + initialized = true; } void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// old-style add object: 64bit id - deprecated ObjectId ManagementAgentImpl::addObject(ManagementObject* object, uint64_t persistId) { + std::string key; + if (persistId) { + key = boost::lexical_cast<std::string>(persistId); + } + return addObject(object, key, persistId != 0); +} + + +// new style add object - use this approach! +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ Mutex::ScopedLock lock(addLock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - ObjectId objectId(&attachment, 0, sequence, objectNum); + uint16_t sequence = persistent ? 0 : bootSequence; + + ObjectId objectId(&attachment, 0, sequence); + if (key.empty()) + objectId.setV2Key(*object); // let object generate the key + else + objectId.setV2Key(key); - // TODO: fix object-id handling object->setObjectId(objectId); newManagementObjects[objectId] = object; return objectId; } + void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." << event.getPackageName() << "." << event.getEventName(); - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str()); + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + string content; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + MapCodec::encode(map_, content); + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { Mutex::ScopedUnlock unlock(agentLock); - Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); - invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + invokeMethodRequest(item->body, item->cid, item->replyTo); delete item; } } @@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) void ManagementAgentImpl::startProtocol() { - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - connected = true; - encodeHeader(buffer, 'A'); - buffer.putShortString("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - uint32_t length = buffer.getPosition(); - buffer.reset(); - connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); + sendHeartbeat(); } void ManagementAgentImpl::storeData(bool requested) @@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData() } } -void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, - uint32_t code, string text) +void ManagementAgentImpl::sendHeartbeat() { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + static const string addr_exchange("qmf.default.topic"); + static const string addr_key("agent.ind.heartbeat"); - encodeHeader(outBuffer, 'z', sequence); - outBuffer.putLong(code); - outBuffer.putShortString(text); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} + Variant::Map map; + Variant::Map headers; + string content; -void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock lock(agentLock); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key); - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - storeData(); - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); +} - attachment.setBanks(assignedBrokerBank, assignedAgentBank); +void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, + const string& text, uint32_t code) +{ + static const string addr_exchange("qmf.default.direct"); - // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); + Variant::Map map; + Variant::Map headers; + Variant::Map values; + string content; - // Send package indications for all local packages - for (PackageMap::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_exception"; + headers["qmf.agent"] = name_address; - encodeHeader(outBuffer, 'p'); - encodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + values["error_code"] = code; + values["error_text"] = text; + map["_values"] = values; - // Send class indications for all local classes - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { - outBuffer.reset(); - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); - } - } + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey); + + QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } -void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence) +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) { Mutex::ScopedLock lock(agentLock); string packageName; @@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; + string body; encodeHeader(outBuffer, 's', sequence); - schema.writeSchemaCall(outBuffer); + schema.writeSchemaCall(body); + outBuffer.putRawData(body); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } @@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo) { - string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + string methodName; + bool failed = false; + Variant::Map inMap; + Variant::Map outMap; + Variant::Map::const_iterator oid, mid; + string content; + + MapCodec::decode(body, inMap); + + outMap["_values"] = Variant::Map(); + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + failed = true; + } else { + string methodName; + ObjectId objId; + Variant::Map inArgs; + Variant::Map callMap; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); + try { + // conversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); - encodeHeader(outBuffer, 'm', sequence); + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); - } - else - try { - outBuffer.record(); - iter->second->doMethod(methodName, inBuffer, outBuffer); - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + failed = true; + } else { + iter->second->doMethod(methodName, inArgs, callMap); } + + if (callMap["_status_code"].asUint32() == 0) { + outMap["_arguments"] = Variant::Map(); + for (Variant::Map::const_iterator iter = callMap.begin(); + iter != callMap.end(); iter++) + if (iter->first != "_status_code" && iter->first != "_status_text") + outMap["_arguments"].asMap()[iter->first] = iter->second; + } else { + (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"]; + (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"]; + failed = true; + } + + } catch(types::InvalidConversion& e) { + outMap.clear(); + outMap["_values"] = Variant::Map(); + (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + failed = true; + } + } + + Variant::Map headers; + headers["method"] = "response"; + headers["qmf.agent"] = name_address; + if (failed) { + headers["qmf.opcode"] = "_exception"; + QPID_LOG(trace, "SENT Exception map=" << outMap); + } else { + headers["qmf.opcode"] = "_method_response"; + QPID_LOG(trace, "SENT MethodResponse map=" << outMap); } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + MapCodec::encode(outMap, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo); } -void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) { - FieldTable ft; - FieldTable::ValuePtr value; - moveNewObjectsLH(); - ft.decode(inBuffer); + Variant::Map inMap; + Variant::Map::const_iterator i; + Variant::Map headers; + + MapCodec::decode(body, inMap); + QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"] = Variant(); + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + string content; + + /* + * Unpack the _what element of the query. Currently we only support OBJECT queries. + */ + i = inMap.find("_what"); + if (i == inMap.end()) { + sendException(replyTo, cid, "_what element missing in Query"); + return; + } - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + if (i->second.getType() != qpid::types::VAR_STRING) { + sendException(replyTo, cid, "_what element is not a string"); + return; + } - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo<string>()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo<string>()) - return; + if (i->second.asString() != "OBJECT") { + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + return; + } + + string className; + string packageName; + + /* + * Handle the _schema_id element, if supplied. + */ + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); - ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + } + + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter != managementObjects.end()) { ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + object->mapEncodeValues(values, true, true); // write both stats and properties + objId.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + headers.erase("partial"); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + oidMap.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); - QPID_LOG(trace, "SENT ObjectInd"); + object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + } } - sendCommandComplete(replyTo, sequence); - return; } - string className(value->get<string>()); + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); +} - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + static const string addr_exchange("qmf.default.direct"); - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); + Variant::Map map; + Variant::Map headers; + string content; - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; - QPID_LOG(trace, "SENT ObjectInd"); - } - } + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + MapCodec::encode(map, content); + connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); - sendCommandComplete(replyTo, sequence); + { + Mutex::ScopedLock lock(agentLock); + clientWasAdded = true; + } } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo) { if (extThread) { Mutex::ScopedLock lock(agentLock); - string body; - inBuffer.getRawData(body, inBuffer.available()); - methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + methodQueue.push_back(new QueuedMethod(cid, replyTo, body)); if (pipeHandle != 0) { pipeHandle->write("X", 1); } else if (notifyable != 0) { @@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inCallback = false; } } else { - invokeMethodRequest(inBuffer, sequence, replyTo); + invokeMethodRequest(body, cid, replyTo); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc void ManagementAgentImpl::received(Message& msg) { + string replyToKey; + framing::MessageProperties mp = msg.getMessageProperties(); + if (mp.hasReplyTo()) { + const framing::ReplyTo& rt = mp.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + + if (mp.hasAppId() && mp.getAppId() == "qmf2") + { + string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); + string cid = msg.getMessageProperties().getCorrelationId(); + + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); + else { + QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); + } + return; + } + + // old preV2 binary messages + + uint32_t sequence; string data = msg.getData(); Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; - uint32_t sequence; - string replyToKey; - framing::MessageProperties p = msg.getMessageProperties(); - if (p.hasReplyTo()) { - const framing::ReplyTo& rt = p.getReplyTo(); - replyToKey = rt.getRoutingKey(); - } if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); + else + QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); } } + void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); @@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq buf.putLong (seq); } +Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, + const string& cname, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_hash"] = types::Uuid(md5Sum); + return map_; +} + + bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) @@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - qpid::management::ManagementObject::writeSchemaCall_t schemaCall) + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { -#define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProcessing() !baseObject->isDeleted())) continue; - Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_stats || send_props) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + object->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); } if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - - if (msgBuffer.available() < (BUFSIZE / 2)) - break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." << - baseObject->getPackageName() << "." << baseObject->getClassName(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); + string content; + ListCodec::encode(list_, content); + if (content.length()) { + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + QPID_LOG(trace, "SENT DataIndication"); } } @@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProcessing() } deleteList.clear(); - - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - - contentSize = BUFSIZE - msgBuffer.available(); - msgBuffer.reset(); - connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str()); - } + sendHeartbeat(); } void ManagementAgentImpl::ConnectionThread::run() @@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); + session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + arg::bindingKey=agent.name_address); + session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); QPID_LOG(info, "Connection established with broker"); @@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThread::run() if (shutdown) return; operational = true; + agent.connected = true; agent.startProtocol(); try { Mutex::ScopedUnlock _unlock(connLock); @@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, const string& exchange, const string& routingKey) { + Message msg; + string data; + + buf.getRawData(data, length); + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + +void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data, + const string& cid, + const Variant::Map headers, + const string& exchange, + const string& routingKey, + const string& contentType) +{ + Message msg; + Variant::Map::const_iterator i; + + if (!cid.empty()) + msg.getMessageProperties().setCorrelationId(cid); + + if (!contentType.empty()) + msg.getMessageProperties().setContentType(contentType); + for (i = headers.begin(); i != headers.end(); ++i) { + msg.getHeaders().setString(i->first, i->second.asString()); + } + msg.getHeaders().setString("app_id", "qmf2"); + + msg.setData(data); + sendMessage(msg, exchange, routingKey); +} + + + + + +void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, + const string& exchange, + const string& routingKey) +{ ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); @@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, s = subscriptions; } - Message msg; - string data; - - buf.getRawData(data, length); msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData(data); + msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address); try { session.messageTransfer(arg::content=msg, arg::destination=exchange); } catch(exception& e) { - QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); + QPID_LOG(error, "Exception caught in sendMessage: " << e.what()); // Bounce the connection if (s) s->stop(); } } + + void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index affaa45d2d..d1609341be 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -51,6 +51,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen // Methods from ManagementAgent // int getMaxThreads() { return 1; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void init(const std::string& brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, @@ -75,6 +78,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key, + bool persistent); void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT); uint32_t pollCallbacks(uint32_t callLimit = 0); int getSignalFd(); @@ -120,10 +125,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen }; struct QueuedMethod { - QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : - sequence(_seq), replyTo(_reply), body(_body) {} + QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) : + cid(_cid), replyTo(_reply), body(_body) {} - uint32_t sequence; + std::string cid; std::string replyTo; std::string body; }; @@ -140,6 +145,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void received (client::Message& msg); + qpid::types::Variant::Map attrMap; + std::string name_address; uint16_t interval; bool extThread; sys::PipeHandle* pipeHandle; @@ -155,6 +162,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen client::ConnectionSettings connectionSettings; bool initialized; bool connected; + bool useMapMsg; std::string lastFailure; bool clientWasAdded; @@ -198,6 +206,15 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint32_t length, const std::string& exchange, const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map headers, + const std::string& exchange, + const std::string& routingKey, + const std::string& contentType="amqp/map"); + void sendMessage(qpid::client::Message msg, + const std::string& exchange, + const std::string& routingKey); void bindToBank(uint32_t brokerBank, uint32_t agentBank); void close(); bool isSleeping() const; @@ -237,16 +254,21 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen PackageMap::iterator pIter, ClassMap::iterator cIter); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleAttachResponse (qpid::framing::Buffer& inBuffer); + void sendHeartbeat(); + void sendException(const std::string& replyToKey, const std::string& cid, + const std::string& text, uint32_t code=1); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); - void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); - void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); - void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo); + + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); }; |
