/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "ManagementBroker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include #include #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" #include #include #include using boost::intrusive_ptr; using qpid::framing::Uuid; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; using namespace qpid::sys; using namespace std; Mutex ManagementAgent::Singleton::lock; bool ManagementAgent::Singleton::disabled = false; ManagementAgent* ManagementAgent::Singleton::agent = 0; int ManagementAgent::Singleton::refCount = 0; ManagementAgent::Singleton::Singleton(bool disableManagement) { Mutex::ScopedLock _lock(lock); if (disableManagement && !disabled) { disabled = true; assert(refCount == 0); // can't disable after agent has been allocated } if (refCount == 0 && !disabled) agent = new ManagementBroker(); refCount++; } ManagementAgent::Singleton::~Singleton() { Mutex::ScopedLock _lock(lock); refCount--; if (refCount == 0 && !disabled) { delete agent; agent = 0; } } ManagementAgent* ManagementAgent::Singleton::getInstance() { return agent; } ManagementBroker::RemoteAgent::~RemoteAgent () { if (mgmtObject != 0) mgmtObject->resourceDestroy(); } ManagementBroker::ManagementBroker () : threadPoolSize(1), interval(10), broker(0) { localBank = 5; nextObjectId = 1; bootSequence = 1; nextRemoteBank = 10; nextRequestSequence = 1; clientWasAdded = false; } ManagementBroker::~ManagementBroker () { Mutex::ScopedLock lock (userLock); // Reset the shared pointers to exchanges. If this is not done now, the exchanges // will stick around until dExchange and mExchange are implicitely destroyed (long // after this destructor completes). Those exchanges hold references to management // objects that will be invalid. dExchange.reset(); mExchange.reset(); timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject* object = iter->second; delete object; } managementObjects.clear(); } void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads) { dataDir = _dataDir; interval = _interval; broker = _broker; threadPoolSize = _threads; timer.add (intrusive_ptr (new Periodic(*this, interval))); // Get from file or generate and save to file. if (dataDir.empty()) { uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { string filename(dataDir + "/.mbrokerdata"); ifstream inFile(filename.c_str ()); if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; writeData(); } else { uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); } } void ManagementBroker::writeData () { string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; outFile.close(); } } void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } void ManagementBroker::RegisterClass (string packageName, string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } uint64_t ManagementBroker::addObject (ManagementObject* object, uint32_t persistId, uint32_t persistBank) { Mutex::ScopedLock lock (addLock); uint64_t objectId; if (persistId == 0) { objectId = ((uint64_t) bootSequence) << 48 | ((uint64_t) localBank) << 24 | nextObjectId++; if ((nextObjectId & 0xFF000000) != 0) { nextObjectId = 1; localBank++; } } else objectId = ((uint64_t) persistBank) << 24 | persistId; object->setObjectId (objectId); newManagementObjects[objectId] = object; return objectId; } ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} ManagementBroker::Periodic::~Periodic () {} void ManagementBroker::Periodic::fire () { broker.timer.add (intrusive_ptr (new Periodic (broker, broker.interval))); broker.PeriodicProcessing (); } void ManagementBroker::clientAdded (void) { Mutex::ScopedLock lock (userLock); clientWasAdded = true; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'x'); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); } } void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); buf.putOctet ('1'); buf.putOctet (opcode); buf.putLong (seq); } bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { uint8_t h1 = buf.getOctet (); uint8_t h2 = buf.getOctet (); uint8_t h3 = buf.getOctet (); *opcode = buf.getOctet (); *seq = buf.getLong (); return h1 == 'A' && h2 == 'M' && h3 == '1'; } void ManagementBroker::SendBuffer (Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, string routingKey) { if (exchange.get() == 0) return; intrusive_ptr msg (new Message ()); AMQFrame method (in_place( ProtocolVersion(), exchange->getName (), 0, 0)); AMQFrame header (in_place()); AMQFrame content(in_place()); content.castBody()->decode(buf, length); method.setEof (false); header.setBof (false); header.setEof (false); content.setBof (false); msg->getFrames().append(method); msg->getFrames().append(header); MessageProperties* props = msg->getFrames().getHeaders()->get(true); props->setContentLength(length); msg->getFrames().append(content); DeliverableMessage deliverable (msg); exchange->route (deliverable, routingKey, 0); } void ManagementBroker::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); iter != newManagementObjects.end (); iter++) managementObjects[iter->first] = iter->second; newManagementObjects.clear(); } void ManagementBroker::PeriodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; std::list deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); EncodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".heartbeat"; SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } moveNewObjectsLH(); if (clientWasAdded) { clientWasAdded = false; for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject* object = iter->second; object->setAllChanged (); } } if (managementObjects.empty ()) return; for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject* object = iter->second; if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); EncodeHeader (msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); EncodeHeader (msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName (); SendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted ()) deleteList.push_back (iter->first); } // Delete flagged objects for (std::list::reverse_iterator iter = deleteList.rbegin (); iter != deleteList.rend (); iter++) managementObjects.erase (*iter); if (!deleteList.empty()) { deleteList.clear(); deleteOrphanedAgentsLH(); } } void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, uint32_t code, string text) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // // agent..# // broker.# // // where is any non-negative decimal integer less than the lowest remote // object-id bank. if (routingKey == "broker") { dispatchAgentCommandLH (msg); return false; } else if (routingKey.compare(0, 6, "agent.") == 0) { std::string::size_type delim = routingKey.find('.', 6); if (delim == string::npos) delim = routingKey.length(); string bank = routingKey.substr(6, delim - 6); if ((uint32_t) atoi(bank.c_str()) <= localBank) { dispatchAgentCommandLH (msg); return false; } } return true; } void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint64_t objId = inBuffer.getLongLong(); inBuffer.getShortString(methodName); std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; EncodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) { for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); pIter++) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'p', sequence); EncodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } sendCommandComplete (replyToKey, sequence); } void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { std::string packageName; inBuffer.getShortString(packageName); FindOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { std::string packageName; inBuffer.getShortString (packageName); PackageMap::iterator pIter = packages.find (packageName); if (pIter != packages.end ()) { ClassMap cMap = pIter->second; for (ClassMap::iterator cIter = cMap.begin (); cIter != cMap.end (); cIter++) { if (cIter->second->hasSchema ()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'q', sequence); EncodeClassIndication (outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } } } sendCommandComplete (replyToKey, sequence); } void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) { std::string packageName; SchemaClassKey key; inBuffer.getShortString(packageName); inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint32_t sequence = nextRequestSequence++; EncodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); outBuffer.putShortString(key.name); outBuffer.putBin128(key.hash); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); SchemaClass* newSchema = new SchemaClass; newSchema->pendingSequence = sequence; pIter->second[key] = newSchema; } } void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package // is from a remote management agent, send the stored schema information. if (writeSchemaCall != 0) writeSchemaCall(buf); else buf.putRawData(buffer, bufferLen); } void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; inBuffer.getShortString (packageName); inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); PackageMap::iterator pIter = packages.find (packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find (key); if (cIter != cMap.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; SchemaClass* classInfo = cIter->second; if (classInfo->hasSchema()) { EncodeHeader(outBuffer, 's', sequence); classInfo->appendSchema (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } else sendCommandComplete (replyToKey, sequence, 1, "Schema not available"); } else sendCommandComplete (replyToKey, sequence, 1, "Class key not found"); } else sendCommandComplete (replyToKey, sequence, 1, "Package not found"); } void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; inBuffer.record(); inBuffer.getShortString (packageName); inBuffer.getShortString (key.name); inBuffer.getBin128 (key.hash); inBuffer.restore(); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) { size_t length = ValidateSchema(inBuffer); if (length == 0) cMap.erase(key); else { cIter->second->buffer = (uint8_t*) malloc(length); cIter->second->bufferLen = length; inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen); // Publish a class-indication message Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'q'); EncodeClassIndication (outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } } bool ManagementBroker::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) if (aIter->second->objIdBank == bank) return true; return false; } uint32_t ManagementBroker::allocateNewBank () { while (bankInUse (nextRemoteBank)) nextRemoteBank++; uint32_t allocated = nextRemoteBank++; writeData (); return allocated; } uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) { if (requestedBank == 0 || bankInUse (requestedBank)) return allocateNewBank (); return requestedBank; } void ManagementBroker::deleteOrphanedAgentsLH() { vector deleteList; for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { uint64_t connectionRef = aIter->first; bool found = false; for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { if (iter->first == connectionRef && !iter->second->isDeleted()) { found = true; break; } } if (!found) { deleteList.push_back(connectionRef); delete aIter->second; } } for (vector::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { remoteAgents.erase(*dIter); } deleteList.clear(); } void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBank; uint32_t assignedBank; uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjectsLH(); deleteOrphanedAgentsLH(); RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); return; } inBuffer.getShortString (label); systemId.decode (inBuffer); requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; agent->routingKey = replyToKey; agent->connectionRef = connectionRef; agent->mgmtObject = new management::Agent (this, agent); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[connectionRef] = agent; // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; moveNewObjectsLH(); ft.decode(inBuffer); value = ft.get("_class"); if (value.get() == 0 || !value->convertsTo()) { // TODO: Send completion with an error code return; } string className (value->get()); for (ManagementObjectMap::iterator iter = managementObjects.begin (); iter != managementObjects.end (); iter++) { ManagementObject* object = iter->second; if (object->getClassName () == className) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, dExchange, replyToKey); } } sendCommandComplete (replyToKey, sequence); } void ManagementBroker::dispatchAgentCommandLH (Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; uint32_t sequence; string replyToKey; const framing::MessageProperties* p = msg.getFrames().getHeaders()->get(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); replyToKey = rt.getRoutingKey(); } else return; if (msg.encodedSize() > MA_BUFFER_SIZE) { QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); return; } msg.encodeContent(inBuffer); inBuffer.reset(); if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) return pIter; // No such package found, create a new map entry. pair result = packages.insert (pair (name, ClassMap ())); QPID_LOG (debug, "ManagementBroker added package " << name); // Publish a package-indication message Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; EncodeHeader (outBuffer, 'p'); EncodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } void ManagementBroker::AddClass(PackageMap::iterator pIter, string className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; key.name = className; memcpy (&key.hash, md5Sum, 16); ClassMap::iterator cIter = cMap.find (key); if (cIter != cMap.end ()) return; // No such class found, create a new class with local information. QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); SchemaClass* classInfo = new SchemaClass; classInfo->writeSchemaCall = schemaCall; cMap[key] = classInfo; cIter = cMap.find (key); } void ManagementBroker::EncodePackageIndication (Buffer& buf, PackageMap::iterator pIter) { buf.putShortString ((*pIter).first); } void ManagementBroker::EncodeClassIndication (Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; buf.putShortString ((*pIter).first); buf.putShortString (key.name); buf.putBin128 (key.hash); } size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; string text; uint8_t hash[16]; inBuffer.record(); inBuffer.getShortString(text); inBuffer.getShortString(text); inBuffer.getBin128(hash); uint16_t propCount = inBuffer.getShort(); uint16_t statCount = inBuffer.getShort(); uint16_t methCount = inBuffer.getShort(); uint16_t evntCount = inBuffer.getShort(); for (uint16_t idx = 0; idx < propCount + statCount; idx++) { FieldTable ft; ft.decode(inBuffer); } for (uint16_t idx = 0; idx < methCount; idx++) { FieldTable ft; ft.decode(inBuffer); int argCount = ft.getInt("argCount"); for (int mIdx = 0; mIdx < argCount; mIdx++) { FieldTable aft; aft.decode(inBuffer); } } if (evntCount != 0) return 0; end = inBuffer.getPosition(); inBuffer.restore(); // restore original position return end - start; }