diff options
| author | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
| commit | 2e29faa768283390452b7d432db28d43cd4a27aa (patch) | |
| tree | 521e9711b340a330408245ba699b35d12b36ce9c /cpp/src/qpid/management | |
| parent | 5e50981ac8a35db09723ad19f5994703d00e10d9 (diff) | |
| download | qpid-python-2e29faa768283390452b7d432db28d43cd4a27aa.tar.gz | |
Merged the changes from the qmf-devel0.7a branch back to the trunk.
This is a checkpoint along the QMFv2 development path.
This update introduces portions of QMFv2 into the code:
- The C++ agent (qpid/agent) uses QMFv2 for data and method transfer
o The APIs no longer use qpid::framing::*
o Consequently, boost is no longer referenced from the API headers.
o Agents and Objects are now referenced by strings, not numbers.
o Schema transfer still uses the QMFv1 format.
- The broker-resident agent can use QMFv1 or QMFv2 based on the command line options.
It defaults to QMFv1 for compatibility.
- The pure-python QMF console (qmf.console) can concurrently interact with both
QMFv1 and QMFv2 agents.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 1405 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 224 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 15 |
5 files changed, 1451 insertions, 256 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4454d70427..bc62588f5d 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -29,20 +29,46 @@ #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" +#include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/List.h" +#include "qpid/amqp_0_10/Codecs.h" #include <list> #include <iostream> #include <fstream> #include <sstream> +#include <typeinfo> using boost::intrusive_ptr; using qpid::framing::Uuid; +using qpid::types::Variant; +using qpid::amqp_0_10::MapCodec; +using qpid::amqp_0_10::ListCodec; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; +using namespace qpid; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; + + +static Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const std::string& type, + const uint8_t *md5Sum) +{ + Variant::Map map_; + + map_["_package_name"] = pname; + map_["_class_name"] = cname; + map_["_type"] = type; + map_["_hash"] = qpid::types::Uuid(md5Sum); + return map_; +} + + ManagementAgent::RemoteAgent::~RemoteAgent () { QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); @@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } } -ManagementAgent::ManagementAgent () : +ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), startTime(uint64_t(Duration(now()))), - suppressed(false) + suppressed(false), + qmf1Support(qmfV1), qmf2Support(qmfV2) { nextObjectId = 1; brokerBank = 1; @@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized() { timer->add(new Periodic(*this, interval)); } + +void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) +{ + attrMap["_vendor"] = vendor; + attrMap["_product"] = product; + string inst; + if (instance.empty()) { + if (uuid.isNull()) + { + throw Exception("ManagementAgent::configure() must be called if default name is used."); + } + inst = uuid.str(); + } else + inst = instance; + + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; +} + + void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); @@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } +// Deprecated: V1 objects ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId) { uint16_t sequence; @@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId objectNum = persistId; } - ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - objId.setV2Key(*object); + ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum); + objId.setV2Key(*object); // let object generate the v2 key + + object->setObjectId(objId); + + { + Mutex::ScopedLock lock (addLock); + ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); + if (destIter != newManagementObjects.end()) { + if (destIter->second->isDeleted()) { + newDeletedManagementObjects.push_back(destIter->second); + newManagementObjects.erase(destIter); + } else { + QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() << + " key=" << objId.getV2Key()); + return objId; + } + } + newManagementObjects[objId] = object; + } + + return objId; +} + + + +ObjectId ManagementAgent::addObject(ManagementObject* object, + const std::string& key, + bool persistent) +{ + uint16_t sequence; + + sequence = persistent ? 0 : bootSequence; + + ObjectId objId(0 /*flags*/, sequence, brokerBank); + if (key.empty()) { + objId.setV2Key(*object); // let object generate the key + } else { + objId.setV2Key(key); + } + object->setObjectId(objId); { @@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + if (qmf1Support) { + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + std::string sBuf; + event.encode(sBuf); + outBuffer.putRawData(sBuf); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); + QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); + } + + if (qmf2Support) { + Variant::Map map_; + Variant::Map schemaId; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), + event.getEventName(), + "_event", + event.getMd5Sum()); + event.mapEncode(values); + map_["_values"] = values; + map_["_timestamp"] = uint64_t(Duration(now())); + map_["_severity"] = sev; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_event"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName(); + + string content; + MapCodec::encode(map_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); + } + } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) @@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer& buf, } catch(exception&) {} } + +void ManagementAgent::sendBuffer(const std::string& data, + const std::string& cid, + const Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey) +{ + Variant::Map::const_iterator i; + + if (suppressed) { + QPID_LOG(trace, "Suppressed management message to " << routingKey); + return; + } + if (exchange.get() == 0) return; + + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody(data))); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(data.length()); + if (!cid.empty()) { + props->setCorrelationId(cid); + } + + for (i = headers.begin(); i != headers.end(); ++i) { + msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + } + msg->getOrInsertHeaders().setString("app_id", "qmf2"); + + DeliveryProperties* dp = + msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + dp->setRoutingKey(routingKey); + + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + + void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); @@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 - QPID_LOG(trace, "Management agent periodic processing") - Mutex::ScopedLock lock (userLock); + QPID_LOG(trace, "Management agent periodic processing"); + Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; + std::string sBuf; uint64_t uptime = uint64_t(Duration(now())) - startTime; static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); @@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing (void) continue; Buffer msgBuffer(msgChars, BUFSIZE); + Variant::List list_; + for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; + bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()); + send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); + + if (send_props && qmf1Support) { encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - pcount++; + sBuf.clear(); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + + if (send_stats && qmf1Support) { encodeHeader(msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - scount++; + sBuf.clear(); + object->writeStatistics(sBuf); + msgBuffer.putRawData(sBuf); } + if ((send_stats || send_props) && qmf2Support) { + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, send_props, send_stats); + map_["_values"] = values; + list_.push_back(map_); + + } + + if (send_props) pcount++; + if (send_stats) scount++; + if (object->isDeleted()) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - if (msgBuffer.available() < HEADROOM) + if (qmf1Support && (msgBuffer.available() < HEADROOM)) break; } } - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + if (pcount || scount) { + if (qmf1Support) { + contentSize = BUFSIZE - msgBuffer.available(); + if (contentSize > 0) { + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } + + if (qmf2Support) { + string content; + ListCodec::encode(list_, content); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName(); + // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + } + } } } @@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing (void) for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin(); cdIter != deletedManagementObjects.end(); cdIter++) { collisionDeletions = true; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'c'); - (*cdIter)->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - stringstream key; - key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + { + if (qmf1Support) { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'c'); + sBuf.clear(); + (*cdIter)->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + stringstream key; + key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + Variant::Map headers; + + map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(), + (*cdIter)->getClassName(), + "_data", + (*cdIter)->getMd5Sum()); + (*cdIter)->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + stringstream key; + key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName(); + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str()); + } + } } if (!deleteList.empty() || collisionDeletions) { @@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing (void) deleteOrphanedAgentsLH(); } - { + // heartbeat generation + + if (qmf1Support) { +#define BUFSIZE 65536 + uint32_t contentSize; + char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); @@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing (void) sendBuffer (msgBuffer, contentSize, mExchange, routingKey); QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey); } + + if (qmf2Support) { + static const string addr_key("agent.ind.heartbeat"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_heartbeat_indication"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, "", headers, v2Topic, addr_key); + + QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); + } QPID_LOG(debug, "periodic update " << debugSnapshot()); } @@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; + if (qmf1Support) { #define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - uint32_t contentSize; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); - - encodeHeader(msgBuffer, 'c'); - object->writeProperties(msgBuffer); - contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBuffer(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + char msgChars[DNOW_BUFSIZE]; + uint32_t contentSize; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + std::string sBuf; + + encodeHeader(msgBuffer, 'c'); + object->writeProperties(sBuf); + msgBuffer.putRawData(sBuf); + contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + stringstream key; + key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); + sendBuffer(msgBuffer, contentSize, mExchange, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } + + if (qmf2Support) { + Variant::List list_; + Variant::Map map_; + Variant::Map values; + + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + "_data", + object->getMd5Sum()); + object->mapEncodeValues(values, true, false); + map_["_values"] = values; + list_.push_back(map_); + + stringstream key; + key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); + + Variant::Map headers; + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + string content; + ListCodec::encode(list_, content); + sendBuffer(content, "", headers, v2Topic, key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + } managementObjects.erase(oid); } @@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, - const FieldTable* /*args*/) + const FieldTable* /*args*/, + const bool topic) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - // Parse the routing key. This management broker should act as though it - // is bound to the exchange to match the following keys: - // - // agent.1.0.# - // broker - // schema.# + if (qmf1Support && topic) { - if (routingKey == "broker") { - dispatchAgentCommandLH(msg); - return false; - } + // qmf1 is bound only to the topic management exchange. + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# - else if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); - return false; - } + if (routingKey == "broker") { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.length() > 6) { + + if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommandLH(msg); + return false; + } + + if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessageLH(msg); + } - else if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + } } - else if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); - return true; + if (qmf2Support) { + + if (topic) { + + // Intercept messages bound to: + // "console.ind.locate.# - process these messages, and also allow them to be forwarded. + + if (routingKey.compare(0, 18, "console.ind.locate") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + + } else { // direct exchange + + // Intercept messages bound to: + // "broker" - generic alias for the local broker + // "<name_address>" - the broker agent's proper name + // and do not forward them futher + if (routingKey == "broker" || routingKey == name_address) { + dispatchAgentCommandLH(msg); + return false; + } + } } return true; @@ -610,14 +960,19 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; AclModule* acl = broker->getAcl(); + std::string inArgs; - ObjectId objId(inBuffer); + std::string sBuf; + inBuffer.getRawData(sBuf, 16); + ObjectId objId; + objId.decode(sBuf); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); + inBuffer.getRawData(inArgs, inBuffer.available()); - QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << + QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << methodName << " replyTo=" << replyToKey); encodeHeader(outBuffer, 'm', sequence); @@ -629,8 +984,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); + return; } if (acl != 0) { @@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - return; + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + return; } } @@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey try { outBuffer.record(); Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); + std::string outBuf; + iter->second->doMethod(methodName, inArgs, outBuf); + outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.restore(); outBuffer.putLong(Manageable::STATUS_EXCEPTION); @@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } + +void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo, + const std::string& cid, const ConnectionToken* connToken) +{ + string methodName; + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + string content; + + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) + { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid); + return; + } + + ObjectId objId; + Variant::Map inArgs; + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + + mid = inMap.find("_arguments"); + if (mid != inMap.end()) { + inArgs = (mid->second).asMap(); + } + } catch(exception& e) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid); + return; + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid); + return; + } + + // validate + AclModule* acl = broker->getAcl(); + DisallowedMethods::const_iterator i; + + i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName)); + if (i != disallowed.end()) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = i->second; + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid); + return; + } + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map<acl::Property, string> params; + params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); + params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid); + return; + } + } + + // invoke the method + + QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() + << ":" << iter->second->getClassName() << " method=" << + methodName << " replyTo=" << replyTo); + + try { + iter->second->doMethod(methodName, inArgs, outMap); + } catch(exception& e) { + outMap.clear(); + headers["qmf.opcode"] = "_exception"; + (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION; + (outMap["_values"].asMap())["_status_text"] = e.what(); + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid); + return; + } + + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid); +} + + void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uin uint32_t outLen; uint32_t sequence = nextRequestSequence++; + // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); @@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. - if (writeSchemaCall != 0) - writeSchemaCall(buf); - else + if (writeSchemaCall != 0) { + std::string schema; + writeSchemaCall(schema); + buf.putRawData(schema); + } else buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } @@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject, 0); @@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin ft.decode(inBuffer); - QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence); + QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo<string>()) { @@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } sendCommandComplete(replyToKey, sequence); @@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin object->setUpdateTime(); if (!object->isDeleted()) { + std::string sBuf; encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); + object->writeProperties(sBuf); + outBuffer.putRawData(sBuf); + sBuf.clear(); + object->writeStatistics(sBuf, true); + outBuffer.putRawData(sBuf); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } } @@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uin sendCommandComplete(replyToKey, sequence); } + +void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + if (contentType != "_query_v1") { + QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!"); + return; + } + + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator i; + Variant::Map headers; + + QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); + + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + headers["partial"]; + + Variant::List list_; + Variant::Map map_; + Variant::Map values; + string className; + string content; + + i = inMap.find("_class"); + if (i != inMap.end()) + try { + className = i->second.asString(); + } catch(exception& e) { + className.clear(); + QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored."); + } + + if (className.empty()) { + ObjectId objId; + i = inMap.find("_object_id"); + if (i != inMap.end()) { + + try { + objId = ObjectId(i->second.asMap()); + } catch (exception &e) { + objId = ObjectId(); // empty object id - won't find a match (I hope). + QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid); + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } else { + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + + // @todo: support multiple objects per message reply + values.clear(); + list_.clear(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + object->mapEncodeValues(values, true, true); // write both stats and properties + map_["_values"] = values; + list_.push_back(map_); + + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + } + } + } + } + + // end empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid); +} + + +void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, + const string& cid) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest"); + + Variant::Map map; + Variant::Map headers; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_agent_locate_response"; + headers["qmf.agent"] = name_address; + + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + + string content; + MapCodec::encode(map, content); + sendBuffer(content, cid, headers, v2Direct, replyTo); + + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); +} + + bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; + uint32_t sequence = 0; + bool methodReq = false; + bool mapMsg = false; + string packageName; + string className; + string methodName; + std::string cid; if (msg.encodedSize() > MA_BUFFER_SIZE) return false; msg.encodeContent(inBuffer); + uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + mapMsg = true; + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (headers->getAsString("qmf.opcode") == "_method_request") + { + methodReq = true; + + // extract object id and method name + + std::string body; + inBuffer.getRawData(body, bufferLen); + Variant::Map inMap; + MapCodec::decode(body, inMap); + Variant::Map::const_iterator oid, mid; + + ObjectId objId; - if (opcode == 'M') { + if ((oid = inMap.find("_object_id")) == inMap.end() || + (mid = inMap.find("_method_name")) == inMap.end()) { + QPID_LOG(warning, + "Missing fields in QMF authorize req received."); + return false; + } + + try { + // coversions will throw if input is invalid. + objId = ObjectId(oid->second.asMap()); + methodName = mid->second.getString(); + } catch(exception& e) { + QPID_LOG(warning, + "Badly formatted QMF authorize req received."); + return false; + } + + // look up schema for object to get package and class name + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + + if (iter == managementObjects.end() || iter->second->isDeleted()) { + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + objId); + return false; + } + + packageName = iter->second->getPackageName(); + className = iter->second->getClassName(); + } + } else { // old style binary message format + + uint8_t opcode; + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + methodReq = true; + + // extract method name & schema package and class name + + uint8_t hash[16]; + inBuffer.getLongLong(); // skip over object id + inBuffer.getLongLong(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + } + } + + if (methodReq) { // TODO: check method call against ACL list. + map<acl::Property, string> params; AclModule* acl = broker->getAcl(); if (acl == 0) return true; string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); - string packageName; - string className; - uint8_t hash[16]; - string methodName; - - map<acl::Property, string> params; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - params[acl::PROP_SCHEMAPACKAGE] = packageName; params[acl::PROP_SCHEMACLASS] = className; if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) return true; + // authorization failed, send reply if replyTo present + const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); + string replyToKey = rt.getRoutingKey(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + if (mapMsg) { - encodeHeader(outBuffer, 'm', sequence); - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence) - } + Variant::Map outMap; + Variant::Map headers; + + headers["method"] = "response"; + headers["qmf.opcode"] = "_method_response"; + headers["qmf.agent"] = name_address; + + ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN; + ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN); + + string content; + MapCodec::encode(outMap, content); + sendBuffer(content, cid, headers, v2Direct, replyToKey); + + } else { + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); + } return false; } @@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg) { - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; string replyToKey; const framing::MessageProperties* p = @@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) else return; + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + if (msg.encodedSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); @@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + const framing::FieldTable *headers = msg.getApplicationHeaders(); + + if (headers && headers->getAsString("app_id") == "qmf2") + { + std::string opcode = headers->getAsString("qmf.opcode"); + std::string contentType = headers->getAsString("qmf.content"); + std::string body; + std::string cid; + + inBuffer.getRawData(body, bufferLen); + + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } + + if (opcode == "_method_request") + return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher()); + else if (opcode == "_query_request") + return handleGetQueryLH(body, replyToKey, cid, contentType); + else if (opcode == "_agent_locate_request") + return handleLocateRequestLH(body, replyToKey, cid); + + QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); + return; + } + + // old preV2 binary messages + while (inBuffer.getPosition() < bufferLen) { + uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; @@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) return iter; } - void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (userLock); @@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std::string& className, const std::string& disallowed[std::make_pair(className, methodName)] = message; } +void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const { + _map["_cname"] = name; + _map["_hash"] = qpid::types::Uuid(hash); +} + +void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; + + if ((i = _map.find("_cname")) != _map.end()) { + name = i->second.asString(); + } + + if ((i = _map.find("_hash")) != _map.end()) { + const qpid::types::Uuid& uuid = i->second.asUuid(); + memcpy(hash, uuid.data(), uuid.size()); + } +} + void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.putShortString(name); buffer.putBin128(hash); } void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) { - buffer.checkAvailable(encodedSize()); + buffer.checkAvailable(encodedBufSize()); buffer.getShortString(name); buffer.getBin128(hash); } -uint32_t ManagementAgent::SchemaClassKey::encodedSize() const { +uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const { return 1 + name.size() + 16 /* bin128 */; } -void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putOctet(kind); - outBuf.putLong(pendingSequence); - outBuf.putLongString(data); +void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const { + _map["_type"] = kind; + _map["_pending_sequence"] = pendingSequence; + _map["_data"] = data; } -void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) { - inBuf.checkAvailable(encodedSize()); - kind = inBuf.getOctet(); - pendingSequence = inBuf.getLong(); - inBuf.getLongString(data); -} +void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { + Variant::Map::const_iterator i; -uint32_t ManagementAgent::SchemaClass::encodedSize() const { - return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size(); + if ((i = _map.find("_type")) != _map.end()) { + kind = i->second; + } + if ((i = _map.find("_pending_sequence")) != _map.end()) { + pendingSequence = i->second; + } + if ((i = _map.find("_data")) != _map.end()) { + data = i->second.asString(); + } } void ManagementAgent::exportSchemas(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, kmap, cmap; + for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { string name = i->first; const ClassMap& classes = i ->second; @@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std::string& out) { const SchemaClass& klass = j->second; if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. // Encode name, schema-key, schema-class - size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - outBuf.putShortString(name); - key.encode(outBuf); - klass.encode(outBuf); + + map_.clear(); + kmap.clear(); + cmap.clear(); + + key.mapEncode(kmap); + klass.mapEncode(cmap); + + map_["_pname"] = name; + map_["_key"] = kmap; + map_["_class"] = cmap; + list_.push_back(map_); } } } + + ListCodec::encode(list_, out); } void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + + for (l = content.begin(); l != content.end(); l++) { string package; SchemaClassKey key; SchemaClass klass; - inBuf.getShortString(package); - key.decode(inBuf); - klass.decode(inBuf); - packages[package][key] = klass; + Variant::Map map_, kmap, cmap; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_pname")) != map_.end()) { + package = i->second.asString(); + + if ((i = map_.find("_key")) != map_.end()) { + key.mapDecode(i->second.asMap()); + + if ((i = map_.find("_class")) != map_.end()) { + klass.mapDecode(i->second.asMap()); + + packages[package][key] = klass; + } + } + } } } -void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const { - outBuf.checkAvailable(encodedSize()); - outBuf.putLong(brokerBank); - outBuf.putLong(agentBank); - outBuf.putShortString(routingKey); - // TODO aconway 2010-03-04: we send the v2Key instead of the - // ObjectId because that has the same meaning on different - // brokers. ObjectId::encode doesn't currently encode the v2Key, - // this can be cleaned up when it does. - outBuf.putMediumString(connectionRef.getV2Key()); - mgmtObject->writeProperties(outBuf); +void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { + Variant::Map _objId, _values; + + map_["_brokerBank"] = brokerBank; + map_["_agentBank"] = agentBank; + map_["_routingKey"] = routingKey; + + connectionRef.mapEncode(_objId); + map_["_object_id"] = _objId; + + mgmtObject->mapEncodeValues(_values, true, false); + map_["_values"] = _values; } -void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) { - brokerBank = inBuf.getLong(); - agentBank = inBuf.getLong(); - inBuf.getShortString(routingKey); +void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { + Variant::Map::const_iterator i; + + if ((i = map_.find("_brokerBank")) != map_.end()) { + brokerBank = i->second; + } + + if ((i = map_.find("_agentBank")) != map_.end()) { + agentBank = i->second; + } + + if ((i = map_.find("_routingKey")) != map_.end()) { + routingKey = i->second.getString(); + } - // TODO aconway 2010-03-04: see comment in encode() - string connectionKey; - inBuf.getMediumString(connectionKey); - connectionRef = ObjectId(); // Clear out any existing value. - connectionRef.setV2Key(connectionKey); + if ((i = map_.find("_object_id")) != map_.end()) { + connectionRef.mapDecode(i->second.asMap()); + } mgmtObject = new _qmf::Agent(&agent, this); - mgmtObject->readProperties(inBuf); + + if ((i = map_.find("_values")) != map_.end()) { + mgmtObject->mapDecodeValues(i->second.asMap()); + } + // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. mgmtObject->set_connectionRef(connectionRef); } -uint32_t ManagementAgent::RemoteAgent::encodedSize() const { - // TODO aconway 2010-03-04: see comment in encode() - return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long - + routingKey.size() + sizeof(uint8_t) // ShortString - + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string - + mgmtObject->writePropertiesSize(); -} - void ManagementAgent::exportAgents(std::string& out) { - out.clear(); + Variant::List list_; + Variant::Map map_, omap, amap; + for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); i != remoteAgents.end(); ++i) { // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode RemoteAgent* agent = i->second; - size_t encodedSize = agent->encodedSize(); - size_t end = out.size(); - out.resize(end + encodedSize); - framing::Buffer outBuf(&out[end], encodedSize); - agent->encode(outBuf); + + map_.clear(); + amap.clear(); + + agent->mapEncode(amap); + map_["_remote_agent"] = amap; + list_.push_back(map_); } + + ListCodec::encode(list_, out); } void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { - while (inBuf.available()) { + string buf(inBuf.getPointer(), inBuf.available()); + Variant::List content; + ListCodec::decode(buf, content); + Variant::List::const_iterator l; + + for (l = content.begin(); l != content.end(); l++) { std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - agent->decode(inBuf); - addObject(agent->mgmtObject, 0); - remoteAgents[agent->connectionRef] = agent.release(); + Variant::Map map_; + Variant::Map::const_iterator i; + + map_ = l->asMap(); + + if ((i = map_.find("_remote_agent")) != map_.end()) { + + agent->mapDecode(i->second.asMap()); + + addObject (agent->mgmtObject, 0, false); + remoteAgents[agent->connectionRef] = agent.release(); + } } } @@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapshot() { msg << " new objects: " << newManagementObjects.size(); return msg.str(); } + +Variant::Map ManagementAgent::toMap(const FieldTable& from) +{ + Variant::Map map; + + for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const string& key(iter->first); + const FieldTable::ValuePtr& val(iter->second); + + map[key] = toVariant(val); + } + + return map; +} + +Variant::List ManagementAgent::toList(const List& from) +{ + Variant::List _list; + + for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) { + const List::ValuePtr& val(*iter); + + _list.push_back(toVariant(val)); + } + + return _list; +} + +qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from) +{ + qpid::framing::FieldTable ft; + + for (Variant::Map::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const string& key(iter->first); + const Variant& val(iter->second); + + ft.set(key, toFieldValue(val)); + } + + return ft; +} + + +List ManagementAgent::fromList(const Variant::List& from) +{ + List fa; + + for (Variant::List::const_iterator iter = from.begin(); + iter != from.end(); + iter++) { + const Variant& val(*iter); + + fa.push_back(toFieldValue(val)); + } + + return fa; +} + + +boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) +{ + + switch(in.getType()) { + + case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); + case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); + case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); + case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); + case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); + case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); + case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); + case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); + case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); + case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); + case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); + case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); + case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); + case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); + case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); + case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList()))); + } + + QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); + return boost::shared_ptr<FieldValue>(new VoidValue()); +} + +// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. +Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) +{ + const std::string iso885915("iso-8859-15"); + const std::string utf8("utf8"); + const std::string utf16("utf16"); + //const std::string binary("binary"); + const std::string amqp0_10_binary("amqp0-10:binary"); + //const std::string amqp0_10_bit("amqp0-10:bit"); + const std::string amqp0_10_datetime("amqp0-10:datetime"); + const std::string amqp0_10_struct("amqp0-10:struct"); + Variant out; + + //based on AMQP 0-10 typecode, pick most appropriate variant type + switch (in->getType()) { + //Fixed Width types: + case 0x00: //bin8 + case 0x01: out.setEncoding(amqp0_10_binary); // int8 + case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8 + case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; // + // case 0x04: break; //TODO: iso-8859-15 char // char + case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8 + + case 0x10: out.setEncoding(amqp0_10_binary); // bin16 + case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 + case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 + + case 0x20: out.setEncoding(amqp0_10_binary); // bin32 + case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 + case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 + + case 0x23: out = in->get<float>(); break; // float(32) + + // case 0x27: break; //TODO: utf-32 char + + case 0x30: out.setEncoding(amqp0_10_binary); // bin64 + case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 + + case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding + case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 + case 0x33: out = in->get<double>(); break; // double + + case 0x48: // uuid + { + unsigned char data[16]; + in->getFixedWidthValue<16>(data); + out = qpid::types::Uuid(data); + } break; + + //TODO: figure out whether and how to map values with codes 0x40-0xd8 + + case 0xf0: break;//void, which is the default value for Variant + // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant + + //Variable Width types: + //strings: + case 0x80: // str8 + case 0x90: // str16 + case 0xa0: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_binary); + break; + + case 0x84: // str8 + case 0x94: // str16 + out = in->get<std::string>(); + out.setEncoding(iso885915); + break; + + case 0x85: // str8 + case 0x95: // str16 + out = in->get<std::string>(); + out.setEncoding(utf8); + break; + + case 0x86: // str8 + case 0x96: // str16 + out = in->get<std::string>(); + out.setEncoding(utf16); + break; + + case 0xab: // str32 + out = in->get<std::string>(); + out.setEncoding(amqp0_10_struct); + break; + + case 0xa8: // map + out = ManagementAgent::toMap(in->get<FieldTable>()); + break; + + case 0xa9: // list of variant types + out = ManagementAgent::toList(in->get<List>()); + break; + //case 0xaa: //convert amqp0-10 array (uniform type) into variant list + // out = Variant::List(); + // translate<Array>(in, out.asList(), &toVariant); + // break; + + default: + //error? + QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); + break; + } + + return out; +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 5b2c54f1b8..0250f39dd6 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -32,7 +32,9 @@ #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" +#include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FieldValue.h> #include <memory> #include <string> #include <map> @@ -62,7 +64,7 @@ public: } severity_t; - ManagementAgent (); + ManagementAgent (const bool qmfV1, const bool qmfV2); virtual ~ManagementAgent (); /** Called before plugins are initialized */ @@ -74,6 +76,9 @@ public: /** Called by cluster to suppress management output during update. */ void suppress(bool s) { suppressed = s; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); void setInterval(uint16_t _interval) { interval = _interval; } void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); @@ -91,6 +96,9 @@ public: ManagementObject::writeSchemaCall_t schemaCall); QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + const std::string& key, + bool persistent = true); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); @@ -99,7 +107,8 @@ public: bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); + const framing::FieldTable* args, + const bool topic); const framing::Uuid& getUuid() const { return uuid; } @@ -128,6 +137,15 @@ public: uint16_t getBootSequence(void) { return bootSequence; } void setBootSequence(uint16_t b) { bootSequence = b; } + // TODO: remove these when Variant API moved into common library. + static types::Variant::Map toMap(const framing::FieldTable& from); + static framing::FieldTable fromMap(const types::Variant::Map& from); + static types::Variant::List toList(const framing::List& from); + static framing::List fromList(const types::Variant::List& from); + static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); + static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + + private: struct Periodic : public qpid::sys::TimerTask { @@ -153,9 +171,8 @@ private: ManagementObject* GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; // TODO: Eventually replace string with entire reply-to structure. reply-to @@ -175,9 +192,11 @@ private: std::string name; uint8_t hash[16]; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); void encode(framing::Buffer& buffer) const; void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + uint32_t encodedBufSize() const; }; struct SchemaClassKeyComp @@ -209,9 +228,8 @@ private: bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } void appendSchema (framing::Buffer& buf); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedSize() const; + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); }; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; @@ -264,6 +282,14 @@ private: typedef std::map<MethodName, std::string> DisallowedMethods; DisallowedMethods disallowed; + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; @@ -279,6 +305,11 @@ private: uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); @@ -311,6 +342,10 @@ private: void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType); + void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken); + void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid); + size_t validateSchema(framing::Buffer&, uint8_t kind); size_t validateTableSchema(framing::Buffer&); diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp index 0813e30891..6dc41ef073 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -29,13 +29,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + DirectExchange(_name, _parent, b), + managementAgent(0) {} ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - DirectExchange(_name, _durable, _args, _parent, b) {} + DirectExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementDirectExchange::route(Deliverable& msg, const string& routingKey, @@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Deliverable& msg, { bool routeIt = true; - // TODO: Intercept messages directed to the embedded agent and send them to the management agent. + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/); if (routeIt) DirectExchange::route(msg, routingKey, args); diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4b87800174..46fc67d07f 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -22,7 +22,10 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> #include <stdlib.h> @@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) ((uint64_t) (bank & 0x0fffffff)); } -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) - : agent(0) +// Deprecated +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) + : agent(0), agentEpoch(seq) { first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (bank & 0x0fffffff)); + ((uint64_t) (broker & 0x000fffff)) << 28; second = object; } -ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) - : agent(_agent) + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) + : agent(0), second(0), agentEpoch(seq) { first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) + : agent(_agent), second(0), agentEpoch(seq) +{ + + first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48; - second = object; } + ObjectId::ObjectId(std::istream& in) : agent(0) { std::string text; @@ -75,6 +89,10 @@ void ObjectId::fromString(const std::string& text) # define atoll(X) _atoi64(X) #endif + // format: + // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> + // V2: Not used + std::string copy(text.c_str()); char* cText; char* field[FIELDS]; @@ -99,10 +117,13 @@ void ObjectId::fromString(const std::string& text) if (idx != FIELDS) throw Exception("Invalid ObjectId format"); + agentEpoch = atoll(field[1]); + first = (atoll(field[0]) << 60) + (atoll(field[1]) << 48) + - (atoll(field[2]) << 28) + - atoll(field[3]); + (atoll(field[2]) << 28); + + agentName = std::string(field[3]); second = atoll(field[4]); } @@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &other) const return first == otherFirst && second == other.second; } -void ObjectId::encode(framing::Buffer& buffer) const +// encode as V1-format binary +void ObjectId::encode(std::string& buffer) const { + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + if (agent == 0) - buffer.putLongLong(first); + body.putLongLong(first); else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); } -void ObjectId::decode(framing::Buffer& buffer) +// decode as V1-format binary +void ObjectId::decode(const std::string& buffer) { - first = buffer.getLongLong(); - second = buffer.getLongLong(); + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); + v2Key = boost::lexical_cast<std::string>(second); } +// generate the V2 key from the index fields defined +// in the schema. void ObjectId::setV2Key(const ManagementObject& object) { std::stringstream oname; @@ -145,6 +185,42 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } +// encode as V2-format map +void ObjectId::mapEncode(types::Variant::Map& map) const +{ + map["_object_name"] = v2Key; + if (!agentName.empty()) + map["_agent_name"] = agentName; + if (agentEpoch) + map["_agent_epoch"] = agentEpoch; +} + +// decode as v2-format map +void ObjectId::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_object_name")) != map.end()) + v2Key = i->second.asString(); + else + throw Exception("Required _object_name field missing."); + + if ((i = map.find("_agent_name")) != map.end()) + agentName = i->second.asString(); + + if ((i = map.find("_agent_epoch")) != map.end()) + agentEpoch = i->second.asInt64(); +} + + +ObjectId::operator types::Variant::Map() const +{ + types::Variant::Map m; + mapEncode(m); + return m; +} + + namespace qpid { namespace management { @@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) out << ((virtFirst & 0xF000000000000000LL) >> 60) << "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << - "-" << (virtFirst & 0x000000000FFFFFFFLL) << + "-" << i.agentName << "-" << i.second; return out; } @@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -void ManagementObject::writeTimestamps (framing::Buffer& buf) const +void ManagementObject::writeTimestamps (std::string& buf) const { - buf.putShortString (getPackageName ()); - buf.putShortString (getClassName ()); - buf.putBin128 (getMd5Sum ()); - buf.putLongLong (updateTime); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - objectId.encode(buf); + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + std::string oid; + objectId.encode(oid); + buf += oid; } -void ManagementObject::readTimestamps (framing::Buffer& buf) +void ManagementObject::readTimestamps (const std::string& buf) { + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); std::string unused; uint8_t unusedUuid[16]; - ObjectId unusedObjectId; - buf.getShortString(unused); - buf.getShortString(unused); - buf.getBin128(unusedUuid); - updateTime = buf.getLongLong(); - createTime = buf.getLongLong(); - destroyTime = buf.getLongLong(); - unusedObjectId.decode(buf); + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); } uint32_t ManagementObject::writeTimestampsSize() const { return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedSize(); // objectId + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} + + +void ManagementObject::writeTimestamps (types::Variant::Map& map) const +{ + types::Variant::Map oid, sid; + + sid["_package_name"] = getPackageName(); + sid["_class_name"] = getClassName(); + sid["_hash"] = qpid::types::Uuid(getMd5Sum()); + map["_schema_id"] = sid; + + objectId.mapEncode(oid); + map["_object_id"] = oid; + + map["_update_ts"] = updateTime; + map["_create_ts"] = createTime; + map["_delete_ts"] = destroyTime; +} + +void ManagementObject::readTimestamps (const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_update_ts")) != map.end()) + updateTime = i->second.asUint64(); + if ((i = map.find("_create_ts")) != map.end()) + createTime = i->second.asUint64(); + if ((i = map.find("_delete_ts")) != map.end()) + destroyTime = i->second.asUint64(); } + void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { @@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } + + +void ManagementObject::mapEncode(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) +{ + types::Variant::Map values; + + writeTimestamps(map); + + mapEncodeValues(values, includeProperties, includeStatistics); + map["_values"] = values; +} + +void ManagementObject::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + readTimestamps(map); + + if ((i = map.find("_values")) != map.end()) + mapDecodeValues(i->second.asMap()); +} diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index 98650b3adf..7fdce133e5 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -28,13 +28,16 @@ using namespace qpid::framing; using namespace qpid::sys; ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} + Exchange (_name, _parent, b), + TopicExchange(_name, _parent, b), + managementAgent(0) {} ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, bool _durable, const FieldTable& _args, Manageable* _parent, Broker* b) : Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b) {} + TopicExchange(_name, _durable, _args, _parent, b), + managementAgent(0) {} void ManagementTopicExchange::route(Deliverable& msg, const string& routingKey, @@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deliverable& msg, bool routeIt = true; // Intercept management agent commands - if (qmfVersion == 1) { - if ((routingKey.length() > 6 && - routingKey.substr(0, 6).compare("agent.") == 0) || - (routingKey == "broker")) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - } + if (managementAgent) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */); if (routeIt) TopicExchange::route(msg, routingKey, args); |
