summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementBroker.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
committerTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
commit9d199b74aee76859480a7ee92d95c6db42028b43 (patch)
treeca09aace4aaac2afa9650cc78833d30b056313a9 /cpp/src/qpid/management/ManagementBroker.cpp
parent41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff)
downloadqpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp328
1 files changed, 191 insertions, 137 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 3ae98e8264..0e046bb813 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -179,14 +179,24 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang
dExchange = _dexchange;
}
-void ManagementBroker::RegisterClass (string packageName,
- string className,
+void ManagementBroker::registerClass (string& packageName,
+ string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
- AddClass(pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::registerEvent (string& packageName,
+ string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
ObjectId ManagementBroker::addObject (ManagementObject* object,
@@ -211,6 +221,23 @@ ObjectId ManagementBroker::addObject (ManagementObject* object,
return objId;
}
+void ManagementBroker::raiseEvent(const ManagementEvent& event)
+{
+ Mutex::ScopedLock lock (userLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt.event");
+}
+
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
: TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
@@ -219,7 +246,7 @@ ManagementBroker::Periodic::~Periodic () {}
void ManagementBroker::Periodic::fire ()
{
broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.PeriodicProcessing ();
+ broker.periodicProcessing ();
}
void ManagementBroker::clientAdded (void)
@@ -233,35 +260,35 @@ void ManagementBroker::clientAdded (void)
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'x');
+ encodeHeader (outBuffer, 'x');
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
}
}
-void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
- buf.putOctet ('1');
+ buf.putOctet ('2');
buf.putOctet (opcode);
buf.putLong (seq);
}
-bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::SendBuffer (Buffer& buf,
+void ManagementBroker::sendBuffer (Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
@@ -304,7 +331,7 @@ void ManagementBroker::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementBroker::PeriodicProcessing (void)
+void ManagementBroker::periodicProcessing (void)
{
#define BUFSIZE 65536
Mutex::ScopedLock lock (userLock);
@@ -315,13 +342,13 @@ void ManagementBroker::PeriodicProcessing (void)
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
+ encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
moveNewObjectsLH();
@@ -350,25 +377,25 @@ void ManagementBroker::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
+ encodeHeader (msgBuffer, 'c');
object->writeProperties(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
+ encodeHeader (msgBuffer, 'i');
object->writeStatistics(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->isDeleted ())
@@ -393,12 +420,12 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'z', sequence);
+ encodeHeader (outBuffer, 'z', sequence);
outBuffer.putLong (code);
outBuffer.putShortString (text);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
@@ -411,7 +438,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.0.#
+ // agent.1.0.#
// broker
if (routingKey == "broker") {
@@ -419,12 +446,12 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
return false;
}
- else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
dispatchAgentCommandLH(msg);
return false;
}
- else if (routingKey.compare(0, 6, "agent.") == 0) {
+ else if (routingKey.compare(0, 8, "agent.1.") == 0) {
return authorizeAgentMessageLH(msg);
}
@@ -447,7 +474,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
if (acl != 0) {
string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
@@ -460,7 +487,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
return;
}
}
@@ -476,12 +503,19 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
}
else
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(std::exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putShortString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -489,12 +523,12 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'b', sequence);
+ encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -506,11 +540,11 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
sendCommandComplete (replyToKey, sequence);
@@ -521,7 +555,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
std::string packageName;
inBuffer.getShortString(packageName);
- FindOrAddPackageLH(packageName);
+ findOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -542,11 +576,11 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q', sequence);
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
@@ -558,26 +592,27 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
std::string packageName;
SchemaClassKey key;
+ uint8_t kind = inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
ClassMap::iterator cIter = pIter->second.find(key);
if (cIter == pIter->second.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
uint32_t sequence = nextRequestSequence++;
- EncodeHeader (outBuffer, 'S', sequence);
+ encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
outBuffer.putShortString(key.name);
outBuffer.putBin128(key.hash);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
}
}
@@ -612,11 +647,11 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey
SchemaClass& classInfo = cIter->second;
if (classInfo.hasSchema()) {
- EncodeHeader(outBuffer, 's', sequence);
+ encodeHeader(outBuffer, 's', sequence);
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -634,9 +669,10 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
SchemaClassKey key;
inBuffer.record();
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
+ inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
inBuffer.restore();
PackageMap::iterator pIter = packages.find(packageName);
@@ -644,7 +680,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
- size_t length = ValidateSchema(inBuffer);
+ size_t length = validateSchema(inBuffer, cIter->second.kind);
if (length == 0) {
QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
@@ -658,11 +694,11 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -727,7 +763,7 @@ void ManagementBroker::deleteOrphanedAgentsLH()
void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
- uint32_t requestedBank;
+ uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
@@ -737,14 +773,15 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
- inBuffer.getShortString (label);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
+ inBuffer.getShortString(label);
+ systemId.decode(inBuffer);
+ requestedBrokerBank = inBuffer.getLong();
+ requestedAgentBank = inBuffer.getLong();
+ assignedBank = assignBankLH(requestedAgentBank);
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
@@ -755,7 +792,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
- agent->mgmtObject->set_objectIdBank (assignedBank);
+ agent->mgmtObject->set_brokerBank (brokerBank);
+ agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[connectionRef] = agent;
@@ -764,12 +802,12 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'a', sequence);
+ encodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -799,12 +837,12 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'g', sequence);
+ encodeHeader (outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
}
@@ -824,7 +862,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return false;
if (opcode == 'M') {
@@ -861,12 +899,12 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
return false;
@@ -900,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -915,7 +953,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -930,19 +968,20 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std:
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
+ encodeHeader (outBuffer, 'p');
+ encodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+ sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
return result.first;
}
-void ManagementBroker::AddClass(PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -958,71 +997,76 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter,
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
cIter = cMap.find(key);
}
-void ManagementBroker::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementBroker::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
}
-void ManagementBroker::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementBroker::encodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putOctet((*cIter).second.kind);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128(key.hash);
+}
+
+size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+{
+ if (kind == ManagementItem::CLASS_KIND_TABLE)
+ return validateTableSchema(inBuffer);
+ else if (kind == ManagementItem::CLASS_KIND_EVENT)
+ return validateEventSchema(inBuffer);
+ return 0;
}
-size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
string text;
uint8_t hash[16];
- inBuffer.record();
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_TABLE)
+ return 0;
- uint16_t propCount = inBuffer.getShort();
- uint16_t statCount = inBuffer.getShort();
- uint16_t methCount = inBuffer.getShort();
- uint16_t evntCount = inBuffer.getShort();
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
- for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
- for (uint16_t idx = 0; idx < methCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
}
- }
- for (uint16_t idx = 0; idx < evntCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
}
+ } catch (std::exception& e) {
+ return 0;
}
end = inBuffer.getPosition();
@@ -1030,24 +1074,34 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
return end - start;
}
-Mutex& ManagementBroker::getMutex()
+size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
{
- return userLock;
-}
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
-Buffer* ManagementBroker::startEventLH()
-{
- Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
- EncodeHeader(*outBuffer, 'e');
- outBuffer->putLongLong(uint64_t(Duration(now())));
- return outBuffer;
-}
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_EVENT)
+ return 0;
-void ManagementBroker::finishEventLH(Buffer* outBuffer)
-{
- uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
- outBuffer->reset();
- SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
- delete outBuffer;
-}
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t argCount = inBuffer.getShort();
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+ } catch (std::exception& e) {
+ return 0;
+ }
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}