summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementBroker.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
committerTed Ross <tross@apache.org>2008-07-11 20:14:07 +0000
commit2fd1b08b605d2664394ff5708c3cbaebd1dc21ef (patch)
treed0b5c7cfa8f31a1fc721fb45d7ca77a027875b7d /cpp/src/qpid/management/ManagementBroker.cpp
parent13e2db2a3d0d14881da3c088f084385740df0731 (diff)
downloadqpid-python-2fd1b08b605d2664394ff5708c3cbaebd1dc21ef.tar.gz
QPID-1174 Remote Management Agent for management of external components
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676067 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp201
1 files changed, 163 insertions, 38 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 106033f76f..84e0c650f2 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ nextRequestSequence = 1;
clientWasAdded = false;
// Get from file or generate and save to file.
@@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ AddClass(pIter, className, md5Sum, schemaCall);
}
uint64_t ManagementBroker::addObject (ManagementObject* object,
@@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void)
Mutex::ScopedLock lock (userLock);
clientWasAdded = true;
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'x');
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ }
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
+ std::string packageName;
+
+ inBuffer.getShortString(packageName);
+ FindOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
cIter != cMap.end ();
cIter++)
{
- if (cIter->second.hasSchema ())
+ if (cIter->second->hasSchema ())
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+{
+ std::string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ ClassMap::iterator cIter = pIter->second.find(key);
+ if (cIter == pIter->second.end()) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ uint32_t sequence = nextRequestSequence++;
+
+ EncodeHeader (outBuffer, 'S', sequence);
+ outBuffer.putShortString(packageName);
+ outBuffer.putShortString(key.name);
+ outBuffer.putBin128(key.hash);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+
+ SchemaClass* newSchema = new SchemaClass;
+ newSchema->pendingSequence = sequence;
+ pIter->second[key] = newSchema;
+ }
+}
+
+void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
if (writeSchemaCall != 0)
- writeSchemaCall (buf);
+ writeSchemaCall(buf);
else
- buf.putRawData (buffer, bufferLen);
+ buf.putRawData(buffer, bufferLen);
}
void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getBin128 (key.hash);
PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
+ if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
+ if (cIter != cMap.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass classInfo = cIter->second;
+ SchemaClass* classInfo = cIter->second;
- if (classInfo.hasSchema())
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.appendSchema (outBuffer);
+ if (classInfo->hasSchema()) {
+ EncodeHeader(outBuffer, 's', sequence);
+ classInfo->appendSchema (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
+ outBuffer.reset();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
else
@@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
sendCommandComplete (replyToKey, sequence, 1, "Package not found");
}
+void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.record();
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+ inBuffer.restore();
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ size_t length = ValidateSchema(inBuffer);
+ if (length == 0)
+ cMap.erase(key);
+ else {
+ cIter->second->buffer = (uint8_t*) malloc(length);
+ cIter->second->bufferLen = length;
+ inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+
+ // Publish a class-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q');
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ }
+ }
+ }
+}
+
bool ManagementBroker::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
@@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- Uuid sessionId;
+ string sessionName;
Uuid systemId;
inBuffer.getShortString (label);
- sessionId.decode (inBuffer);
+ inBuffer.getShortString (sessionName);
systemId.decode (inBuffer);
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
// There already exists an agent on this session. Reject the request.
@@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
return;
}
+ // TODO: Reject requests for which the session name does not match an existing session.
+
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
+ agent->routingKey = replyToKey;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_sessionName (sessionName);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
- remoteAgents[sessionId] = agent;
+ remoteAgents[sessionName] = agent;
+ // Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::
return result.first;
}
-void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::AddClass(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass classInfo;
+ SchemaClass* classInfo = new SchemaClass;
- classInfo.writeSchemaCall = schemaCall;
+ classInfo->writeSchemaCall = schemaCall;
cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cIter = cMap.find (key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf,
buf.putBin128 (key.hash);
}
+size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ inBuffer.record();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+ uint16_t evntCount = inBuffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
+
+ if (evntCount != 0)
+ return 0;
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}