diff options
| author | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-07-11 20:14:07 +0000 |
| commit | 2fd1b08b605d2664394ff5708c3cbaebd1dc21ef (patch) | |
| tree | d0b5c7cfa8f31a1fc721fb45d7ca77a027875b7d /cpp/src/qpid/management/ManagementBroker.cpp | |
| parent | 13e2db2a3d0d14881da3c088f084385740df0731 (diff) | |
| download | qpid-python-2fd1b08b605d2664394ff5708c3cbaebd1dc21ef.tar.gz | |
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 201 |
1 files changed, 163 insertions, 38 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 106033f76f..84e0c650f2 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; + nextRequestSequence = 1; clientWasAdded = false; // Get from file or generate and save to file. @@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock (userLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + AddClass(pIter, className, md5Sum, schemaCall); } uint64_t ManagementBroker::addObject (ManagementObject* object, @@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void) Mutex::ScopedLock lock (userLock); clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'x'); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + } } void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) @@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/) +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { + std::string packageName; + + inBuffer.getShortString(packageName); + FindOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, cIter != cMap.end (); cIter++) { - if (cIter->second.hasSchema ()) + if (cIter->second->hasSchema ()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::SchemaClass::appendSchema (Buffer& buf) +void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + std::string packageName; + SchemaClassKey key; + + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + EncodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + + SchemaClass* newSchema = new SchemaClass; + newSchema->pendingSequence = sequence; + pIter->second[key] = newSchema; + } +} + +void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. if (writeSchemaCall != 0) - writeSchemaCall (buf); + writeSchemaCall(buf); else - buf.putRawData (buffer, bufferLen); + buf.putRawData(buffer, bufferLen); } void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getBin128 (key.hash); PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { + if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - { + if (cIter != cMap.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - SchemaClass classInfo = cIter->second; + SchemaClass* classInfo = cIter->second; - if (classInfo.hasSchema()) - { - EncodeHeader (outBuffer, 's', sequence); - classInfo.appendSchema (outBuffer); + if (classInfo->hasSchema()) { + EncodeHeader(outBuffer, 's', sequence); + classInfo->appendSchema (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); + outBuffer.reset(); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } else @@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe sendCommandComplete (replyToKey, sequence, 1, "Package not found"); } +void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + inBuffer.restore(); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { + size_t length = ValidateSchema(inBuffer); + if (length == 0) + cMap.erase(key); + else { + cIter->second->buffer = (uint8_t*) malloc(length); + cIter->second->bufferLen = length; + inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); + + // Publish a class-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q'); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + } + } + } +} + bool ManagementBroker::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); @@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe string label; uint32_t requestedBank; uint32_t assignedBank; - Uuid sessionId; + string sessionName; Uuid systemId; inBuffer.getShortString (label); - sessionId.decode (inBuffer); + inBuffer.getShortString (sessionName); systemId.decode (inBuffer); requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); - RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId); + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. @@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe return; } + // TODO: Reject requests for which the session name does not match an existing session. + RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; + agent->routingKey = replyToKey; agent->mgmtObject = new management::Agent (this, agent); - agent->mgmtObject->set_sessionId (sessionId); + agent->mgmtObject->set_sessionName (sessionName); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); addObject (agent->mgmtObject); - remoteAgents[sessionId] = agent; + remoteAgents[sessionName] = agent; + // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) if (!CheckHeader (inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); - //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std:: return result.first; } -void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::AddClass(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - SchemaClass classInfo; + SchemaClass* classInfo = new SchemaClass; - classInfo.writeSchemaCall = schemaCall; + classInfo->writeSchemaCall = schemaCall; cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cIter = cMap.find (key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, @@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf, buf.putBin128 (key.hash); } +size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + inBuffer.record(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + uint16_t evntCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + + if (evntCount != 0) + return 0; + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} |
