diff options
| author | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
| commit | 9d199b74aee76859480a7ee92d95c6db42028b43 (patch) | |
| tree | ca09aace4aaac2afa9650cc78833d30b056313a9 /cpp/src/qpid/management | |
| parent | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff) | |
| download | qpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz | |
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/Manageable.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 328 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementEvent.h | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 30 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 58 |
6 files changed, 321 insertions, 204 deletions
diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index afa5c968f7..b4d80d8fad 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -45,6 +45,7 @@ class Manageable static const status_t STATUS_INVALID_PARAMETER = 4; static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; static const status_t STATUS_FORBIDDEN = 6; + static const status_t STATUS_EXCEPTION = 7; static const status_t STATUS_USER = 0x00010000; // Every "Manageable" object must hold a reference to exactly one diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 3ae98e8264..0e046bb813 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -179,14 +179,24 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang dExchange = _dexchange; } -void ManagementBroker::RegisterClass (string packageName, - string className, +void ManagementBroker::registerClass (string& packageName, + string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); - AddClass(pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementBroker::registerEvent (string& packageName, + string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } ObjectId ManagementBroker::addObject (ManagementObject* object, @@ -211,6 +221,23 @@ ObjectId ManagementBroker::addObject (ManagementObject* object, return objId; } +void ManagementBroker::raiseEvent(const ManagementEvent& event) +{ + Mutex::ScopedLock lock (userLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, "mgmt.event"); +} + ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} @@ -219,7 +246,7 @@ ManagementBroker::Periodic::~Periodic () {} void ManagementBroker::Periodic::fire () { broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); - broker.PeriodicProcessing (); + broker.periodicProcessing (); } void ManagementBroker::clientAdded (void) @@ -233,35 +260,35 @@ void ManagementBroker::clientAdded (void) Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'x'); + encodeHeader (outBuffer, 'x'); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); + sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey); } } -void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); - buf.putOctet ('1'); + buf.putOctet ('2'); buf.putOctet (opcode); buf.putLong (seq); } -bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::SendBuffer (Buffer& buf, +void ManagementBroker::sendBuffer (Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, string routingKey) @@ -304,7 +331,7 @@ void ManagementBroker::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementBroker::PeriodicProcessing (void) +void ManagementBroker::periodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); @@ -315,13 +342,13 @@ void ManagementBroker::PeriodicProcessing (void) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); + encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } moveNewObjectsLH(); @@ -350,25 +377,25 @@ void ManagementBroker::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + encodeHeader (msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + encodeHeader (msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); } if (object->isDeleted ()) @@ -393,12 +420,12 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'z', sequence); + encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } bool ManagementBroker::dispatchCommand (Deliverable& deliverable, @@ -411,7 +438,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, // Parse the routing key. This management broker should act as though it // is bound to the exchange to match the following keys: // - // agent.0.# + // agent.1.0.# // broker if (routingKey == "broker") { @@ -419,12 +446,12 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, return false; } - else if (routingKey.compare(0, 7, "agent.0") == 0) { + else if (routingKey.compare(0, 9, "agent.1.0") == 0) { dispatchAgentCommandLH(msg); return false; } - else if (routingKey.compare(0, 6, "agent.") == 0) { + else if (routingKey.compare(0, 8, "agent.1.") == 0) { return authorizeAgentMessageLH(msg); } @@ -447,7 +474,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe inBuffer.getShortString(className); inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); if (acl != 0) { string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); @@ -460,7 +487,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); return; } } @@ -476,12 +503,19 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); } else - iter->second->doMethod(methodName, inBuffer, outBuffer); + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(std::exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putShortString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) @@ -489,12 +523,12 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32 Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'b', sequence); + encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) @@ -506,11 +540,11 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p', sequence); - EncodePackageIndication (outBuffer, pIter); + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } sendCommandComplete (replyToKey, sequence); @@ -521,7 +555,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey std::string packageName; inBuffer.getShortString(packageName); - FindOrAddPackageLH(packageName); + findOrAddPackageLH(packageName); } void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -542,11 +576,11 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q', sequence); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } } } @@ -558,26 +592,27 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui std::string packageName; SchemaClassKey key; + uint8_t kind = inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(key.name); inBuffer.getBin128(key.hash); - PackageMap::iterator pIter = FindOrAddPackageLH(packageName); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end()) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; uint32_t sequence = nextRequestSequence++; - EncodeHeader (outBuffer, 'S', sequence); + encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); outBuffer.putShortString(key.name); outBuffer.putBin128(key.hash); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); - pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence))); + pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); } } @@ -612,11 +647,11 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { - EncodeHeader(outBuffer, 's', sequence); + encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } else sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); @@ -634,9 +669,10 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo SchemaClassKey key; inBuffer.record(); - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); inBuffer.restore(); PackageMap::iterator pIter = packages.find(packageName); @@ -644,7 +680,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { - size_t length = ValidateSchema(inBuffer); + size_t length = validateSchema(inBuffer, cIter->second.kind); if (length == 0) { QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); @@ -658,11 +694,11 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); + sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema"); } } } @@ -727,7 +763,7 @@ void ManagementBroker::deleteOrphanedAgentsLH() void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; - uint32_t requestedBank; + uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; @@ -737,14 +773,15 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } - inBuffer.getShortString (label); - systemId.decode (inBuffer); - requestedBank = inBuffer.getLong (); - assignedBank = assignBankLH (requestedBank); + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + assignedBank = assignBankLH(requestedAgentBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; @@ -755,7 +792,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); - agent->mgmtObject->set_objectIdBank (assignedBank); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); addObject (agent->mgmtObject); remoteAgents[connectionRef] = agent; @@ -764,12 +802,12 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'a', sequence); + encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) @@ -799,12 +837,12 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'g', sequence); + encodeHeader (outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); } } @@ -824,7 +862,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return false; if (opcode == 'M') { @@ -861,12 +899,12 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); } return false; @@ -900,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) msg.encodeContent(inBuffer); inBuffer.reset(); - if (!CheckHeader(inBuffer, &opcode, &sequence)) + if (!checkHeader(inBuffer, &opcode, &sequence)) return; if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); @@ -915,7 +953,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); } -ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name) +ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -930,19 +968,20 @@ ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std: Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); return result.first; } -void ManagementBroker::AddClass(PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) +void ManagementBroker::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -958,71 +997,76 @@ void ManagementBroker::AddClass(PackageMap::iterator pIter, QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << key.name); - cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall))); + cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); cIter = cMap.find(key); } -void ManagementBroker::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementBroker::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementBroker::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementBroker::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); +} + +size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; } -size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) +size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; string text; uint8_t hash[16]; - inBuffer.record(); - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; - uint16_t propCount = inBuffer.getShort(); - uint16_t statCount = inBuffer.getShort(); - uint16_t methCount = inBuffer.getShort(); - uint16_t evntCount = inBuffer.getShort(); + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); - for (uint16_t idx = 0; idx < propCount + statCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); - for (uint16_t idx = 0; idx < methCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); } - } - for (uint16_t idx = 0; idx < evntCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } } + } catch (std::exception& e) { + return 0; } end = inBuffer.getPosition(); @@ -1030,24 +1074,34 @@ size_t ManagementBroker::ValidateSchema(Buffer& inBuffer) return end - start; } -Mutex& ManagementBroker::getMutex() +size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) { - return userLock; -} + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; -Buffer* ManagementBroker::startEventLH() -{ - Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); - EncodeHeader(*outBuffer, 'e'); - outBuffer->putLongLong(uint64_t(Duration(now()))); - return outBuffer; -} + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; -void ManagementBroker::finishEventLH(Buffer* outBuffer) -{ - uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); - outBuffer->reset(); - SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event"); - delete outBuffer; -} + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t argCount = inBuffer.getShort(); + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (std::exception& e) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h index 23fba74b83..c0e0c50963 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -38,11 +38,11 @@ namespace management { class ManagementBroker : public ManagementAgent { - private: +private: int threadPoolSize; - public: +public: ManagementBroker (); virtual ~ManagementBroker (); @@ -52,13 +52,18 @@ class ManagementBroker : public ManagementAgent void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); int getMaxThreads () { return threadPoolSize; } - void RegisterClass (std::string packageName, - std::string className, + void registerClass (std::string& packageName, + std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void registerEvent (std::string& packageName, + std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); ObjectId addObject (ManagementObject* object, uint64_t persistId = 0); - void clientAdded (void); + void raiseEvent(const ManagementEvent& event); + void clientAdded (); bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); @@ -68,7 +73,7 @@ class ManagementBroker : public ManagementAgent uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } int getSignalFd () { assert(0); return -1; } - private: +private: friend class ManagementAgent; struct Periodic : public qpid::broker::TimerTask @@ -127,15 +132,16 @@ class ManagementBroker : public ManagementAgent struct SchemaClass { + uint8_t kind; ManagementObject::writeSchemaCall_t writeSchemaCall; uint32_t pendingSequence; size_t bufferLen; uint8_t* buffer; - SchemaClass(uint32_t seq) : - writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} - SchemaClass(ManagementObject::writeSchemaCall_t call) : - writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind, uint32_t seq) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } void appendSchema (framing::Buffer& buf); }; @@ -154,12 +160,12 @@ class ManagementBroker : public ManagementAgent framing::Uuid uuid; sys::Mutex addLock; sys::Mutex userLock; - qpid::broker::Timer timer; + qpid::broker::Timer timer; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - qpid::broker::Broker* broker; + qpid::broker::Broker* broker; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; @@ -173,10 +179,10 @@ class ManagementBroker : public ManagementAgent char eventBuffer[MA_BUFFER_SIZE]; void writeData (); - void PeriodicProcessing (void); - void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (framing::Buffer& buf, + void periodicProcessing (void); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendBuffer (framing::Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, std::string routingKey); @@ -185,14 +191,15 @@ class ManagementBroker : public ManagementAgent bool authorizeAgentMessageLH(qpid::broker::Message& msg); void dispatchAgentCommandLH(qpid::broker::Message& msg); - PackageMap::iterator FindOrAddPackageLH(std::string name); - void AddClass(PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (framing::Buffer& buf, + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); - void EncodeClassIndication (framing::Buffer& buf, + void encodeClassIndication (framing::Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter); bool bankInUse (uint32_t bank); @@ -212,10 +219,9 @@ class ManagementBroker : public ManagementAgent void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - size_t ValidateSchema(framing::Buffer&); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* outBuffer); + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); }; }} diff --git a/cpp/src/qpid/management/ManagementEvent.h b/cpp/src/qpid/management/ManagementEvent.h new file mode 100644 index 0000000000..d5a47e9144 --- /dev/null +++ b/cpp/src/qpid/management/ManagementEvent.h @@ -0,0 +1,48 @@ +#ifndef _ManagementEvent_ +#define _ManagementEvent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include <qpid/framing/Buffer.h> +#include <string> + +namespace qpid { +namespace management { + +class ManagementAgent; + +class ManagementEvent : public ManagementItem { +public: + typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&); + virtual ~ManagementEvent() {} + + virtual writeSchemaCall_t getWriteSchemaCall(void) = 0; + virtual std::string& getEventName() const = 0; + virtual std::string& getPackageName() const = 0; + virtual uint8_t* getMd5Sum() const = 0; + virtual void encode(qpid::framing::Buffer&) const = 0; +}; + +}} + +#endif /*!_ManagementEvent_*/ diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index e0386ee057..ce65ae3279 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -84,6 +84,21 @@ void ObjectId::decode(framing::Buffer& buffer) second = buffer.getLongLong(); } +namespace qpid { +namespace management { + +std::ostream& operator<<(std::ostream& out, const ObjectId& i) +{ + out << "[" << ((i.first & 0xF000000000000000LL) >> 60) << + "-" << ((i.first & 0x0FFF000000000000LL) >> 48) << + "-" << ((i.first & 0x0000FFFFF0000000LL) >> 32) << + "-" << (i.first & 0x000000000FFFFFFFLL) << + "-" << i.second << "]"; + return out; +} + +}} + int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (Buffer& buf) @@ -109,18 +124,3 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } - -Mutex& ManagementObject::getMutex() -{ - return agent->getMutex(); -} - -Buffer* ManagementObject::startEventLH() -{ - return agent->startEventLH(); -} - -void ManagementObject::finishEventLH(Buffer* buf) -{ - agent->finishEventLH(buf); -} diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 1b809f5125..3778d66b7e 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -46,7 +46,7 @@ public: class ObjectId { -private: +protected: const AgentAttachment* agent; uint64_t first; uint64_t second; @@ -59,23 +59,11 @@ public: bool operator<(const ObjectId &other) const; void encode(framing::Buffer& buffer); void decode(framing::Buffer& buffer); + friend std::ostream& operator<<(std::ostream&, const ObjectId&); }; -class ManagementObject -{ - protected: - - uint64_t createTime; - uint64_t destroyTime; - ObjectId objectId; - bool configChanged; - bool instChanged; - bool deleted; - Manageable* coreObject; - sys::Mutex accessLock; - ManagementAgent* agent; - int maxThreads; - +class ManagementItem { +public: static const uint8_t TYPE_U8 = 1; static const uint8_t TYPE_U16 = 2; static const uint8_t TYPE_U32 = 3; @@ -107,15 +95,35 @@ class ManagementObject static const uint8_t FLAG_INDEX = 0x02; static const uint8_t FLAG_END = 0x80; - static int nextThreadIndex; + const static uint8_t CLASS_KIND_TABLE = 1; + const static uint8_t CLASS_KIND_EVENT = 2; + + + +public: + virtual ~ManagementItem() {} +}; + +class ManagementObject : public ManagementItem +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + ObjectId objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + sys::Mutex accessLock; + ManagementAgent* agent; + int maxThreads; + + static int nextThreadIndex; int getThreadIndex(); void writeTimestamps (qpid::framing::Buffer& buf); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* buf); - public: typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); @@ -129,14 +137,14 @@ class ManagementObject virtual void writeProperties(qpid::framing::Buffer& buf) = 0; virtual void writeStatistics(qpid::framing::Buffer& buf, bool skipHeaders = false) = 0; - virtual void doMethod (std::string methodName, + virtual void doMethod (std::string& methodName, qpid::framing::Buffer& inBuf, qpid::framing::Buffer& outBuf) = 0; virtual void setReference (ObjectId objectId); - virtual std::string& getClassName (void) = 0; - virtual std::string& getPackageName (void) = 0; - virtual uint8_t* getMd5Sum (void) = 0; + virtual std::string& getClassName (void) const = 0; + virtual std::string& getPackageName (void) const = 0; + virtual uint8_t* getMd5Sum (void) const = 0; void setObjectId (ObjectId oid) { objectId = oid; } ObjectId getObjectId (void) { return objectId; } |
