diff options
| author | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
| commit | 9fb5fd7a0a800591c334bde2b9556e984217d7de (patch) | |
| tree | c632c481f9cbf647d4ce453ff1076895866fc5e7 /cpp/src/qpid/management | |
| parent | 033f088884f2e6bbc08d6027e1507b6d67eaad53 (diff) | |
| download | qpid-python-9fb5fd7a0a800591c334bde2b9556e984217d7de.tar.gz | |
QPID-1174 - Management updates for remote agents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 176 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 4 |
4 files changed, 72 insertions, 131 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index f66b34c43c..223811ebc2 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker () // objects that will be invalid. dExchange.reset(); mExchange.reset(); + timer.stop(); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. - if (dataDir.empty ()) + if (dataDir.empty()) { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/.mbrokerdata"); - ifstream inFile (filename.c_str ()); + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); - if (inFile.good ()) + if (inFile.good()) { inFile >> uuid; inFile >> bootSequence; inFile >> nextRemoteBank; - inFile.close (); + inFile.close(); QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); bootSequence++; - writeData (); + writeData(); } else { - uuid.generate (); + uuid.generate(); QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData (); + writeData(); } QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); @@ -155,10 +156,10 @@ void ManagementBroker::writeData () string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); - if (outFile.good ()) + if (outFile.good()) { outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close (); + outFile.close(); } } @@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock (userLock); + Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = FindOrAddPackageLH(packageName); AddClass(pIter, className, md5Sum, schemaCall); } @@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence SendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementBroker::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethodLH (msg, routingKey, 13); + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.<X>.# + // broker.# + // + // where <X> is any non-negative decimal integer less than the lowest remote + // object-id bank. - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) + if (routingKey == "broker") { dispatchAgentCommandLH (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementBroker::dispatchMethodLH (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; + return false; } - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; + else if (routingKey.compare(0, 6, "agent.") == 0) { + uint32_t delim = routingKey.find('.', 6); + if (delim == string::npos) + delim = routingKey.length(); + string bank = routingKey.substr(6, delim - 6); + if ((uint32_t) atoi(bank.c_str()) <= localBank) { + dispatchAgentCommandLH (msg); + return false; + } } - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; + return true; +} - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); +void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string methodName; Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; + uint32_t outLen; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); - EncodeHeader (outBuffer, 'm', sequence); + std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl; + EncodeHeader(outBuffer, 'm', sequence); - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe requestedBank = inBuffer.getLong (); assignedBank = assignBankLH (requestedBank); + // TODO: Make a pass over the agents and delete any that no longer have a session. + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); if (aIter != remoteAgents.end()) { @@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[sessionName] = agent; @@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) const framing::MessageProperties* p = msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); } else return; @@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) return; } - msg.encodeContent (inBuffer); - inBuffer.reset (); + msg.encodeContent(inBuffer); + inBuffer.reset(); - if (!CheckHeader (inBuffer, &opcode, &sequence)) + if (!CheckHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 447720fb5e..89ea80b3b2 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -59,7 +59,7 @@ class ManagementBroker : public ManagementAgent uint32_t persistId = 0, uint32_t persistBank = 4); void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, + bool dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -177,9 +177,6 @@ class ManagementBroker : public ManagementAgent std::string routingKey); void moveNewObjectsLH(); - void dispatchMethodLH (broker::Message& msg, - const std::string& routingKey, - size_t first); void dispatchAgentCommandLH (broker::Message& msg); PackageMap::iterator FindOrAddPackageLH(std::string name); @@ -206,6 +203,7 @@ class ManagementBroker : public ManagementAgent void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); size_t ValidateSchema(framing::Buffer&); }; diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index b4824549ed..4ccf8e68c9 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -40,17 +40,16 @@ void ManagementExchange::route (Deliverable& msg, const string& routingKey, const FieldTable* args) { + bool routeIt = true; + // Intercept management agent commands - if ((routingKey.length () > 6 && - routingKey.substr (0, 6).compare ("agent.") == 0) || - (routingKey.length () == 5 && - routingKey.substr (0, 5).compare ("agent") == 0)) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } + if ((routingKey.length() > 6 && + routingKey.substr(0, 6).compare("agent.") == 0) || + (routingKey == "broker")) + routeIt = managementAgent->dispatchCommand(msg, routingKey, args); - TopicExchange::route (msg, routingKey, args); + if (routeIt) + TopicExchange::route(msg, routingKey, args); } bool ManagementExchange::bind (Queue::shared_ptr queue, diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 66adabf035..ce3051367d 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -98,8 +98,8 @@ class ManagementObject qpid::framing::Buffer& outBuf) = 0; virtual void setReference (uint64_t objectId); - virtual std::string getClassName (void) = 0; - virtual std::string getPackageName (void) = 0; + virtual std::string& getClassName (void) = 0; + virtual std::string& getPackageName (void) = 0; virtual uint8_t* getMd5Sum (void) = 0; void setObjectId (uint64_t oid) { objectId = oid; } |
