diff options
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 176 |
1 files changed, 60 insertions, 116 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index f66b34c43c..223811ebc2 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker () // objects that will be invalid. dExchange.reset(); mExchange.reset(); + timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. - if (dataDir.empty ()) + if (dataDir.empty()) { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/.mbrokerdata"); - ifstream inFile (filename.c_str ()); + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); - if (inFile.good ()) + if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; - inFile.close (); + inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; - writeData (); + writeData(); } else { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData (); + writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); @@ -155,10 +156,10 @@ void ManagementBroker::writeData () string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); - if (outFile.good ()) + if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close (); + outFile.close(); } } @@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } @@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethodLH (msg, routingKey, 13); + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) + if (routingKey == "broker") { dispatchAgentCommandLH (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementBroker::dispatchMethodLH (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; + return false; } - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; + else if (routingKey.compare(0, 6, "agent.") == 0) { + uint32_t delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } } - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; + return true; +} - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; + uint32_t outLen; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); - EncodeHeader (outBuffer, 'm', sequence); + std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; + EncodeHeader(outBuffer, 'm', sequence); - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); + // TODO: Make a pass over the agents and delete any that no longer have a session. + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { @@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[sessionName] = agent; @@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); } else return; @@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) return; } - msg.encodeContent (inBuffer); - inBuffer.reset (); + msg.encodeContent(inBuffer); + inBuffer.reset(); - if (!CheckHeader (inBuffer, &opcode, &sequence)) + if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) |
