diff options
| author | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
| commit | 9d199b74aee76859480a7ee92d95c6db42028b43 (patch) | |
| tree | ca09aace4aaac2afa9650cc78833d30b056313a9 /cpp/src/qpid/management/ManagementBroker.cpp | |
| parent | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff) | |
| download | qpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz | |
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 328 |
1 files changed, 191 insertions, 137 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 3ae98e8264..0e046bb813 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -179,14 +179,24 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang dExchange = _dexchange; } -void ManagementBroker::RegisterClass (string packageName, - string className, +void ManagementBroker::registerClass (string& packageName, + string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); - AddClass(pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementBroker::registerEvent (string& packageName, + string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } ObjectId ManagementBroker::addObject (ManagementObject* object, @@ -211,6 +221,23 @@ ObjectId ManagementBroker::addObject (ManagementObject* object, return objId; } +void ManagementBroker::raiseEvent(const ManagementEvent& event) +{ + Mutex::ScopedLock lock (userLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, "mgmt.event"); +} + ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} @@ -219,7 +246,7 @@ ManagementBroker::Periodic::~Periodic () {} void ManagementBroker::Periodic::fire () { broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); - broker.PeriodicProcessing (); + broker.periodicProcessing (); } void ManagementBroker::clientAdded (void) @@ -233,35 +260,35 @@ void ManagementBroker::clientAdded (void) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'x'); + encodeHeader (outBuffer, 'x'); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); } } -void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); - buf.putOctet ('1'); + buf.putOctet ('2'); buf.putOctet (opcode); buf.putLong (seq); } -bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::SendBuffer (Buffer& buf, +void ManagementBroker::sendBuffer (Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, string routingKey) @@ -304,7 +331,7 @@ void ManagementBroker::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementBroker::PeriodicProcessing (void) +void ManagementBroker::periodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); @@ -315,13 +342,13 @@ void ManagementBroker::PeriodicProcessing (void) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); + encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } moveNewObjectsLH(); @@ -350,25 +377,25 @@ void ManagementBroker::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + encodeHeader (msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + encodeHeader (msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted ()) @@ -393,12 +420,12 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'z', sequence); + encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } bool ManagementBroker::dispatchCommand (Deliverable& deliverable, @@ -411,7 +438,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.0.# + // agent.1.0.# // broker if (routingKey == "broker") { @@ -419,12 +446,12 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, return false; } - else if (routingKey.compare(0, 7, "agent.0") == 0) { + else if (routingKey.compare(0, 9, "agent.1.0") == 0) { dispatchAgentCommandLH(msg); return false; } - else if (routingKey.compare(0, 6, "agent.") == 0) { + else if (routingKey.compare(0, 8, "agent.1.") == 0) { return authorizeAgentMessageLH(msg); } @@ -447,7 +474,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); if (acl != 0) { string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); @@ -460,7 +487,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); return; } } @@ -476,12 +503,19 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); } else - iter->second->doMethod(methodName, inBuffer, outBuffer); + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(std::exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putShortString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -489,12 +523,12 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32 Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'b', sequence); + encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) @@ -506,11 +540,11 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p', sequence); - EncodePackageIndication (outBuffer, pIter); + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } sendCommandComplete (replyToKey, sequence); @@ -521,7 +555,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey std::string packageName; inBuffer.getShortString(packageName); - FindOrAddPackageLH(packageName); + findOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -542,11 +576,11 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q', sequence); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } @@ -558,26 +592,27 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui std::string packageName; SchemaClassKey key; + uint8_t kind = inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint32_t sequence = nextRequestSequence++; - EncodeHeader (outBuffer, 'S', sequence); + encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); outBuffer.putShortString(key.name); outBuffer.putBin128(key.hash); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); - pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); } } @@ -612,11 +647,11 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { - EncodeHeader(outBuffer, 's', sequence); + encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } else sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); @@ -634,9 +669,10 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo SchemaClassKey key; inBuffer.record(); - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); inBuffer.restore(); PackageMap::iterator pIter = packages.find(packageName); @@ -644,7 +680,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { - size_t length = ValidateSchema(inBuffer); + size_t length = validateSchema(inBuffer, cIter->second.kind); if (length == 0) { QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); @@ -658,11 +694,11 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -727,7 +763,7 @@ void ManagementBroker::deleteOrphanedAgentsLH() void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; - uint32_t requestedBank; + uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; @@ -737,14 +773,15 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } - inBuffer.getShortString (label); - systemId.decode (inBuffer); - requestedBank = inBuffer.getLong (); - assignedBank = assignBankLH (requestedBank); + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + assignedBank = assignBankLH(requestedAgentBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; @@ -755,7 +792,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); - agent->mgmtObject->set_objectIdBank (assignedBank); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[connectionRef] = agent; @@ -764,12 +802,12 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'a', sequence); + encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -799,12 +837,12 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'g', sequence); + encodeHeader (outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } } @@ -824,7 +862,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return false; if (opcode == 'M') { @@ -861,12 +899,12 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } return false; @@ -900,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -915,7 +953,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -930,19 +968,20 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } -void ManagementBroker::AddClass(PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -958,71 +997,76 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); cIter = cMap.find(key); } -void ManagementBroker::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementBroker::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementBroker::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementBroker::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); +} + +size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; } -size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; string text; uint8_t hash[16]; - inBuffer.record(); - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; - uint16_t propCount = inBuffer.getShort(); - uint16_t statCount = inBuffer.getShort(); - uint16_t methCount = inBuffer.getShort(); - uint16_t evntCount = inBuffer.getShort(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); - for (uint16_t idx = 0; idx < propCount + statCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); - for (uint16_t idx = 0; idx < methCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); } - } - for (uint16_t idx = 0; idx < evntCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } } + } catch (std::exception& e) { + return 0; } end = inBuffer.getPosition(); @@ -1030,24 +1074,34 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) return end - start; } -Mutex& ManagementBroker::getMutex() +size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) { - return userLock; -} + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; -Buffer* ManagementBroker::startEventLH() -{ - Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); - EncodeHeader(*outBuffer, 'e'); - outBuffer->putLongLong(uint64_t(Duration(now()))); - return outBuffer; -} + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; -void ManagementBroker::finishEventLH(Buffer* outBuffer) -{ - uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); - outBuffer->reset(); - SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); - delete outBuffer; -} + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t argCount = inBuffer.getShort(); + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (std::exception& e) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} |
