summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-07-31 13:15:16 +0000
committerTed Ross <tross@apache.org>2008-07-31 13:15:16 +0000
commit9fb5fd7a0a800591c334bde2b9556e984217d7de (patch)
treec632c481f9cbf647d4ce453ff1076895866fc5e7 /cpp/src/qpid/management
parent033f088884f2e6bbc08d6027e1507b6d67eaad53 (diff)
downloadqpid-python-9fb5fd7a0a800591c334bde2b9556e984217d7de.tar.gz
QPID-1174 - Management updates for remote agents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp176
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h6
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp17
-rw-r--r--cpp/src/qpid/management/ManagementObject.h4
4 files changed, 72 insertions, 131 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index f66b34c43c..223811ebc2 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -97,6 +97,7 @@ ManagementBroker::~ManagementBroker ()
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ timer.stop();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -117,33 +118,33 @@ void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
- if (dataDir.empty ())
+ if (dataDir.empty())
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
<< uuid);
}
else
{
- string filename (dataDir + "/.mbrokerdata");
- ifstream inFile (filename.c_str ());
+ string filename(dataDir + "/.mbrokerdata");
+ ifstream inFile(filename.c_str ());
- if (inFile.good ())
+ if (inFile.good())
{
inFile >> uuid;
inFile >> bootSequence;
inFile >> nextRemoteBank;
- inFile.close ();
+ inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
bootSequence++;
- writeData ();
+ writeData();
}
else
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
- writeData ();
+ writeData();
}
QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
@@ -155,10 +156,10 @@ void ManagementBroker::writeData ()
string filename (dataDir + "/.mbrokerdata");
ofstream outFile (filename.c_str ());
- if (outFile.good ())
+ if (outFile.good())
{
outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
- outFile.close ();
+ outFile.close();
}
}
@@ -174,7 +175,7 @@ void ManagementBroker::RegisterClass (string packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock (userLock);
+ Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
AddClass(pIter, className, md5Sum, schemaCall);
}
@@ -391,124 +392,64 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::dispatchCommand (Deliverable& deliverable,
+bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- if (routingKey.compare (0, 13, "agent.method.") == 0)
- dispatchMethodLH (msg, routingKey, 13);
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.<X>.#
+ // broker.#
+ //
+ // where <X> is any non-negative decimal integer less than the lowest remote
+ // object-id bank.
- else if (routingKey.length () == 5 &&
- routingKey.compare (0, 5, "agent") == 0)
+ if (routingKey == "broker") {
dispatchAgentCommandLH (msg);
-
- else
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-}
-
-void ManagementBroker::dispatchMethodLH (Message& msg,
- const string& routingKey,
- size_t first)
-{
- size_t pos, start = first;
- uint32_t contentSize;
-
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
- return;
+ return false;
}
- string packageName = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
- return;
+ else if (routingKey.compare(0, 6, "agent.") == 0) {
+ uint32_t delim = routingKey.find('.', 6);
+ if (delim == string::npos)
+ delim = routingKey.length();
+ string bank = routingKey.substr(6, delim - 6);
+ if ((uint32_t) atoi(bank.c_str()) <= localBank) {
+ dispatchAgentCommandLH (msg);
+ return false;
+ }
}
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
- return;
+ return true;
+}
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string methodName;
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen, sequence;
- uint8_t opcode;
-
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " <<
- msg.encodedSize());
- return;
- }
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- {
- QPID_LOG (debug, " Invalid content header");
- return;
- }
-
- if (opcode != 'M')
- {
- QPID_LOG (debug, " Unexpected opcode " << opcode);
- return;
- }
-
- uint64_t objId = inBuffer.getLongLong ();
- string replyToKey;
+ uint32_t outLen;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- {
- QPID_LOG (debug, " Reply-to missing");
- return;
- }
+ uint64_t objId = inBuffer.getLongLong();
+ inBuffer.getShortString(methodName);
- EncodeHeader (outBuffer, 'm', sequence);
+ std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl;
+ EncodeHeader(outBuffer, 'm', sequence);
- ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end () || iter->second->isDeleted ())
- {
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- }
- else
- {
- iter->second->doMethod (methodName, inBuffer, outBuffer);
+ } else {
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -737,6 +678,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
+ // TODO: Make a pass over the agents and delete any that no longer have a session.
+
RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
@@ -755,6 +698,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_objectIdBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[sessionName] = agent;
@@ -818,10 +762,9 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
}
else
return;
@@ -832,10 +775,10 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
return;
}
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
- if (!CheckHeader (inBuffer, &opcode, &sequence))
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -847,6 +790,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 447720fb5e..89ea80b3b2 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -59,7 +59,7 @@ class ManagementBroker : public ManagementAgent
uint32_t persistId = 0,
uint32_t persistBank = 4);
void clientAdded (void);
- void dispatchCommand (broker::Deliverable& msg,
+ bool dispatchCommand (broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
@@ -177,9 +177,6 @@ class ManagementBroker : public ManagementAgent
std::string routingKey);
void moveNewObjectsLH();
- void dispatchMethodLH (broker::Message& msg,
- const std::string& routingKey,
- size_t first);
void dispatchAgentCommandLH (broker::Message& msg);
PackageMap::iterator FindOrAddPackageLH(std::string name);
@@ -206,6 +203,7 @@ class ManagementBroker : public ManagementAgent
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
size_t ValidateSchema(framing::Buffer&);
};
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index b4824549ed..4ccf8e68c9 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -40,17 +40,16 @@ void ManagementExchange::route (Deliverable& msg,
const string& routingKey,
const FieldTable* args)
{
+ bool routeIt = true;
+
// Intercept management agent commands
- if ((routingKey.length () > 6 &&
- routingKey.substr (0, 6).compare ("agent.") == 0) ||
- (routingKey.length () == 5 &&
- routingKey.substr (0, 5).compare ("agent") == 0))
- {
- managementAgent->dispatchCommand (msg, routingKey, args);
- return;
- }
+ if ((routingKey.length() > 6 &&
+ routingKey.substr(0, 6).compare("agent.") == 0) ||
+ (routingKey == "broker"))
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
- TopicExchange::route (msg, routingKey, args);
+ if (routeIt)
+ TopicExchange::route(msg, routingKey, args);
}
bool ManagementExchange::bind (Queue::shared_ptr queue,
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 66adabf035..ce3051367d 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -98,8 +98,8 @@ class ManagementObject
qpid::framing::Buffer& outBuf) = 0;
virtual void setReference (uint64_t objectId);
- virtual std::string getClassName (void) = 0;
- virtual std::string getPackageName (void) = 0;
+ virtual std::string& getClassName (void) = 0;
+ virtual std::string& getPackageName (void) = 0;
virtual uint8_t* getMd5Sum (void) = 0;
void setObjectId (uint64_t oid) { objectId = oid; }