summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp201
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h44
2 files changed, 188 insertions, 57 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 106033f76f..84e0c650f2 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -55,6 +55,7 @@ ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Managea
nextObjectId = 1;
bootSequence = 1;
nextRemoteBank = 10;
+ nextRequestSequence = 1;
clientWasAdded = false;
// Get from file or generate and save to file.
@@ -155,8 +156,8 @@ void ManagementBroker::RegisterClass (string packageName,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ AddClass(pIter, className, md5Sum, schemaCall);
}
uint64_t ManagementBroker::addObject (ManagementObject* object,
@@ -200,6 +201,17 @@ void ManagementBroker::clientAdded (void)
Mutex::ScopedLock lock (userLock);
clientWasAdded = true;
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'x');
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ }
}
void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -512,8 +524,12 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
+ std::string packageName;
+
+ inBuffer.getShortString(packageName);
+ FindOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -529,7 +545,7 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
cIter != cMap.end ();
cIter++)
{
- if (cIter->second.hasSchema ())
+ if (cIter->second->hasSchema ())
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -546,16 +562,46 @@ void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey,
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+{
+ std::string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
+
+ PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ ClassMap::iterator cIter = pIter->second.find(key);
+ if (cIter == pIter->second.end()) {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ uint32_t sequence = nextRequestSequence++;
+
+ EncodeHeader (outBuffer, 'S', sequence);
+ outBuffer.putShortString(packageName);
+ outBuffer.putShortString(key.name);
+ outBuffer.putBin128(key.hash);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+
+ SchemaClass* newSchema = new SchemaClass;
+ newSchema->pendingSequence = sequence;
+ pIter->second[key] = newSchema;
+ }
+}
+
+void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
if (writeSchemaCall != 0)
- writeSchemaCall (buf);
+ writeSchemaCall(buf);
else
- buf.putRawData (buffer, bufferLen);
+ buf.putRawData(buffer, bufferLen);
}
void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -568,22 +614,19 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getBin128 (key.hash);
PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
+ if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
+ if (cIter != cMap.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass classInfo = cIter->second;
+ SchemaClass* classInfo = cIter->second;
- if (classInfo.hasSchema())
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.appendSchema (outBuffer);
+ if (classInfo->hasSchema()) {
+ EncodeHeader(outBuffer, 's', sequence);
+ classInfo->appendSchema (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
+ outBuffer.reset();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
else
@@ -596,6 +639,44 @@ void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKe
sendCommandComplete (replyToKey, sequence, 1, "Package not found");
}
+void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.record();
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+ inBuffer.restore();
+
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ size_t length = ValidateSchema(inBuffer);
+ if (length == 0)
+ cMap.erase(key);
+ else {
+ cIter->second->buffer = (uint8_t*) malloc(length);
+ cIter->second->bufferLen = length;
+ inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+
+ // Publish a class-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q');
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ }
+ }
+ }
+}
+
bool ManagementBroker::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
@@ -628,16 +709,16 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- Uuid sessionId;
+ string sessionName;
Uuid systemId;
inBuffer.getShortString (label);
- sessionId.decode (inBuffer);
+ inBuffer.getShortString (sessionName);
systemId.decode (inBuffer);
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
- RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
// There already exists an agent on this session. Reject the request.
@@ -645,17 +726,21 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
return;
}
+ // TODO: Reject requests for which the session name does not match an existing session.
+
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
+ agent->routingKey = replyToKey;
agent->mgmtObject = new management::Agent (this, agent);
- agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_sessionName (sessionName);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
addObject (agent->mgmtObject);
- remoteAgents[sessionId] = agent;
+ remoteAgents[sessionName] = agent;
+ // Send an Attach Response
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -734,16 +819,18 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg)
if (!CheckHeader (inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -767,10 +854,10 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::
return result.first;
}
-void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::AddClass(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -785,12 +872,11 @@ void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass classInfo;
+ SchemaClass* classInfo = new SchemaClass;
- classInfo.writeSchemaCall = schemaCall;
+ classInfo->writeSchemaCall = schemaCall;
cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cIter = cMap.find (key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -810,3 +896,42 @@ void ManagementBroker::EncodeClassIndication (Buffer& buf,
buf.putBin128 (key.hash);
}
+size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+{
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
+
+ inBuffer.record();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+ uint16_t evntCount = inBuffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
+
+ if (evntCount != 0)
+ return 0;
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
index 5e9114c3f4..685b7db977 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -89,6 +89,7 @@ class ManagementBroker : public ManagementAgent
struct RemoteAgent : public Manageable
{
uint32_t objIdBank;
+ std::string routingKey;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -97,8 +98,8 @@ class ManagementBroker : public ManagementAgent
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
+ typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
//
@@ -129,16 +130,16 @@ class ManagementBroker : public ManagementAgent
struct SchemaClass
{
ManagementObject::writeSchemaCall_t writeSchemaCall;
- ReplyToVector remoteAgents;
- size_t bufferLen;
- uint8_t* buffer;
+ uint32_t pendingSequence;
+ size_t bufferLen;
+ uint8_t* buffer;
- SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+ SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
@@ -162,6 +163,7 @@ class ManagementBroker : public ManagementAgent
uint32_t localBank;
uint32_t nextObjectId;
uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
bool clientWasAdded;
# define MA_BUFFER_SIZE 65536
@@ -183,11 +185,11 @@ class ManagementBroker : public ManagementAgent
size_t first);
void dispatchAgentCommandLH (broker::Message& msg);
- PackageMap::iterator FindOrAddPackage (std::string name);
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
+ PackageMap::iterator FindOrAddPackageLH(std::string name);
+ void AddClass(PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
void EncodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
void EncodeClassIndication (framing::Buffer& buf,
@@ -198,13 +200,17 @@ class ManagementBroker : public ManagementAgent
uint32_t assignBankLH (uint32_t requestedPrefix);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
- void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+
+ size_t ValidateSchema(framing::Buffer&);
};
}}