summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementBroker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp176
1 files changed, 60 insertions, 116 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index f66b34c43c..223811ebc2 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker ()
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ timer.stop();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
- if (dataDir.empty ())
+ if (dataDir.empty())
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
<< uuid);
}
else
{
- string filename (dataDir + "/.mbrokerdata");
- ifstream inFile (filename.c_str ());
+ string filename(dataDir + "/.mbrokerdata");
+ ifstream inFile(filename.c_str ());
- if (inFile.good ())
+ if (inFile.good())
{
inFile >> uuid;
inFile >> bootSequence;
inFile >> nextRemoteBank;
- inFile.close ();
+ inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
bootSequence++;
- writeData ();
+ writeData();
}
else
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
- writeData ();
+ writeData();
}
QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
@@ -155,10 +156,10 @@ void ManagementBroker::writeData ()
string filename (dataDir + "/.mbrokerdata");
ofstream outFile (filename.c_str ());
- if (outFile.good ())
+ if (outFile.good())
{
outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
- outFile.close ();
+ outFile.close();
}
}
@@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock (userLock);
+ Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
AddClass(pIter, className, md5Sum, schemaCall);
}
@@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::dispatchCommand (Deliverable& deliverable,
+bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- if (routingKey.compare (0, 13, "agent.method.") == 0)
- dispatchMethodLH (msg, routingKey, 13);
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.<X>.#
+ // broker.#
+ //
+ // where <X> is any non-negative decimal integer less than the lowest remote
+ // object-id bank.
- else if (routingKey.length () == 5 &&
- routingKey.compare (0, 5, "agent") == 0)
+ if (routingKey == "broker") {
dispatchAgentCommandLH (msg);
-
- else
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-}
-
-void ManagementBroker::dispatchMethodLH (Message& msg,
- const string& routingKey,
- size_t first)
-{
- size_t pos, start = first;
- uint32_t contentSize;
-
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
- return;
+ return false;
}
- string packageName = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
- return;
+ else if (routingKey.compare(0, 6, "agent.") == 0) {
+ uint32_t delim = routingKey.find('.', 6);
+ if (delim == string::npos)
+ delim = routingKey.length();
+ string bank = routingKey.substr(6, delim - 6);
+ if ((uint32_t) atoi(bank.c_str()) <= localBank) {
+ dispatchAgentCommandLH (msg);
+ return false;
+ }
}
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
- return;
+ return true;
+}
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string methodName;
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen, sequence;
- uint8_t opcode;
-
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " <<
- msg.encodedSize());
- return;
- }
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- {
- QPID_LOG (debug, " Invalid content header");
- return;
- }
-
- if (opcode != 'M')
- {
- QPID_LOG (debug, " Unexpected opcode " << opcode);
- return;
- }
-
- uint64_t objId = inBuffer.getLongLong ();
- string replyToKey;
+ uint32_t outLen;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- {
- QPID_LOG (debug, " Reply-to missing");
- return;
- }
+ uint64_t objId = inBuffer.getLongLong();
+ inBuffer.getShortString(methodName);
- EncodeHeader (outBuffer, 'm', sequence);
+ std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl;
+ EncodeHeader(outBuffer, 'm', sequence);
- ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end () || iter->second->isDeleted ())
- {
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- }
- else
- {
- iter->second->doMethod (methodName, inBuffer, outBuffer);
+ } else {
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
+ // TODO: Make a pass over the agents and delete any that no longer have a session.
+
RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
@@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_objectIdBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[sessionName] = agent;
@@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
}
else
return;
@@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
return;
}
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
- if (!CheckHeader (inBuffer, &opcode, &sequence))
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)