diff options
| author | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-05-12 17:04:07 +0000 |
| commit | 0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch) | |
| tree | d478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/management | |
| parent | 4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff) | |
| download | qpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz | |
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections)
2) Improved handling of federation links:
a) Links can be created even if the remote broker is not reachable
b) If links are lost, re-establishment will occur using an exponential back-off algorithm
3) Durability of exchanges is now viewable through management
4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins.
5) General configuration storage capability has been added to the store/recover interface. This is used for federation links.
6) Management object-ids for durable objects are now themselves durable.
(Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/Manageable.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Manageable.h | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 695 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 156 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 746 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.h | 203 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 1 |
9 files changed, 979 insertions, 859 deletions
diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp index 479cb4e0ce..0f3fbab55c 100644 --- a/cpp/src/qpid/management/Manageable.cpp +++ b/cpp/src/qpid/management/Manageable.cpp @@ -25,13 +25,19 @@ std::string Manageable::StatusText (status_t status) { switch (status) { - case STATUS_OK : return "OK"; - case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; - case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; - case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; - case STATUS_INVALID_PARAMETER : return "InvalidParameter"; + case STATUS_OK : return "OK"; + case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; + case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; + case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; + case STATUS_INVALID_PARAMETER : return "InvalidParameter"; + case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; } return "??"; } +Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&) +{ + return STATUS_UNKNOWN_METHOD; +} + diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h index 836ba03b23..25c24588fc 100644 --- a/cpp/src/qpid/management/Manageable.h +++ b/cpp/src/qpid/management/Manageable.h @@ -39,11 +39,12 @@ class Manageable typedef uint32_t status_t; static std::string StatusText (status_t status); - static const status_t STATUS_OK = 0; - static const status_t STATUS_UNKNOWN_OBJECT = 1; - static const status_t STATUS_UNKNOWN_METHOD = 2; - static const status_t STATUS_NOT_IMPLEMENTED = 3; - static const status_t STATUS_INVALID_PARAMETER = 4; + static const status_t STATUS_OK = 0; + static const status_t STATUS_UNKNOWN_OBJECT = 1; + static const status_t STATUS_UNKNOWN_METHOD = 2; + static const status_t STATUS_NOT_IMPLEMENTED = 3; + static const status_t STATUS_INVALID_PARAMETER = 4; + static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5; // Every "Manageable" object must hold a reference to exactly one // management object. This object is always of a class derived from @@ -58,7 +59,7 @@ class Manageable // on this object. The input and output arguments are specific to the // method being called and must be down-cast to the appropriate sub class // before use. - virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0; + virtual status_t ManagementMethod (uint32_t methodId, Args& args); }; inline Manageable::~Manageable (void) {} diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 9b4290232d..e69de29bb2 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -1,695 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementAgent.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> -#include <qpid/broker/MessageDelivery.h> -#include "qpid/framing/MessageTransferBody.h" -#include <list> -#include <iostream> -#include <fstream> - -using boost::intrusive_ptr; -using namespace qpid::framing; -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::sys; -using namespace std; - -ManagementAgent::shared_ptr ManagementAgent::agent; -bool ManagementAgent::enabled = 0; - -ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : - dataDir (_dataDir), interval (_interval) -{ - timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); - localBank = 3; - nextObjectId = 1; - nextRemotePrefix = 101; - - // Get from file or generate and save to file. - if (dataDir.empty ()) - { - uuid.generate (); - bootSequence = 1; - QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " - << uuid); - } - else - { - string filename (dataDir + "/brokerId"); - string seqFilename (dataDir + "/bootseq"); - ifstream inFile (filename.c_str ()); - ifstream seqFile (seqFilename.c_str ()); - - if (inFile.good ()) - { - inFile >> uuid; - inFile.close (); - QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); - } - else - { - uuid.generate (); - QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); - - ofstream outFile (filename.c_str ()); - if (outFile.good ()) - { - outFile << uuid << endl; - outFile.close (); - QPID_LOG (debug, "ManagementAgent saved broker ID"); - } - else - { - QPID_LOG (warning, "ManagementAgent unable to save broker ID"); - } - } - - if (seqFile.good ()) - { - seqFile >> bootSequence; - seqFile.close (); - } - else - bootSequence = 1; - - ofstream seqOut (seqFilename.c_str ()); - if (seqOut.good ()) - { - uint16_t nextSeq = (bootSequence + 1) & 0x7FFF; - if (nextSeq == 0) - nextSeq = 1; - seqOut << nextSeq << endl; - seqOut.close (); - } - - QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); - } -} - -ManagementAgent::~ManagementAgent () {} - -void ManagementAgent::enableManagement (string dataDir, uint16_t interval) -{ - enabled = 1; - if (agent.get () == 0) - agent = shared_ptr (new ManagementAgent (dataDir, interval)); -} - -ManagementAgent::shared_ptr ManagementAgent::getAgent (void) -{ - return agent; -} - -void ManagementAgent::shutdown (void) -{ - if (agent.get () != 0) - { - agent->mExchange.reset (); - agent->dExchange.reset (); - agent.reset (); - } -} - -void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, - broker::Exchange::shared_ptr _dexchange) -{ - mExchange = _mexchange; - dExchange = _dexchange; -} - -void ManagementAgent::RegisterClass (string packageName, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock (userLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); -} - -void ManagementAgent::addObject (ManagementObject::shared_ptr object, - uint32_t persistId, - uint32_t persistBank) -{ - Mutex::ScopedLock lock (userLock); - uint64_t objectId; - - if (persistId == 0) - objectId = ((uint64_t) bootSequence) << 48 | - ((uint64_t) localBank) << 24 | nextObjectId++; - else - objectId = ((uint64_t) persistBank) << 24 | persistId; - - object->setObjectId (objectId); - managementObjects[objectId] = object; -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} - -ManagementAgent::Periodic::~Periodic () {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); - agent.PeriodicProcessing (); -} - -void ManagementAgent::clientAdded (void) -{ - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - object->setAllChanged (); - } -} - -void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); -} - -bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); - - *opcode = buf.getOctet (); - *seq = buf.getLong (); - - return h1 == 'A' && h2 == 'M' && h3 == '1'; -} - -void ManagementAgent::SendBuffer (Buffer& buf, - uint32_t length, - broker::Exchange::shared_ptr exchange, - string routingKey) -{ - if (exchange.get() == 0) - return; - - intrusive_ptr<Message> msg (new Message ()); - AMQFrame method (in_place<MessageTransferBody>( - ProtocolVersion(), exchange->getName (), 0, 0)); - AMQFrame header (in_place<AMQHeaderBody>()); - AMQFrame content(in_place<AMQContentBody>()); - - content.castBody<AMQContentBody>()->decode(buf, length); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(length); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - exchange->route (deliverable, routingKey, 0); -} - -void ManagementAgent::PeriodicProcessing (void) -{ -#define BUFSIZE 65536 - Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; - string routingKey; - std::list<uint64_t> deleteList; - - if (managementObjects.empty ()) - return; - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - - if (object->getConfigChanged () || object->isDeleted ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); - object->writeConfig (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->getInstChanged ()) - { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); - object->writeInstrumentation (msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->isDeleted ()) - deleteList.push_back (iter->first); - } - - // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - managementObjects.erase (*iter); - - deleteList.clear (); -} - -void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, - uint32_t code, string text) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'z', sequence); - outBuffer.putLong (code); - outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::dispatchCommand (Deliverable& deliverable, - const string& routingKey, - const FieldTable* /*args*/) -{ - Mutex::ScopedLock lock (userLock); - Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - - if (routingKey.compare (0, 13, "agent.method.") == 0) - dispatchMethod (msg, routingKey, 13); - - else if (routingKey.length () == 5 && - routingKey.compare (0, 5, "agent") == 0) - dispatchAgentCommand (msg); - - else - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } -} - -void ManagementAgent::dispatchMethod (Message& msg, - const string& routingKey, - size_t first) -{ - size_t pos, start = first; - uint32_t contentSize; - - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); - return; - } - - string packageName = routingKey.substr (start, pos - start); - - start = pos + 1; - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); - return; - } - - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - string methodName = routingKey.substr (start, routingKey.length () - start); - - contentSize = msg.encodedContentSize (); - if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) - return; - - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen, sequence; - uint8_t opcode; - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - { - QPID_LOG (debug, " Invalid content header"); - return; - } - - if (opcode != 'M') - { - QPID_LOG (debug, " Unexpected opcode " << opcode); - return; - } - - uint64_t objId = inBuffer.getLongLong (); - string replyToKey; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - { - QPID_LOG (debug, " Reply-to missing"); - return; - } - - EncodeHeader (outBuffer, 'm', sequence); - - ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end () || iter->second->isDeleted ()) - { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } - else - { - iter->second->doMethod (methodName, inBuffer, outBuffer); - } - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'b', sequence); - uuid.encode (outBuffer); - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence) -{ - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'p', sequence); - EncodePackageIndication (outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) -{ - std::string packageName; - - inBuffer.getShortString (packageName); - FindOrAddPackage (packageName); -} - -void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - std::string packageName; - - inBuffer.getShortString (packageName); - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin (); - cIter != cMap.end (); - cIter++) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'q', sequence); - EncodeClassIndication (outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); - - PackageMap::iterator pIter = packages.find (packageName); - if (pIter != packages.end ()) - { - ClassMap cMap = pIter->second; - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - SchemaClass classInfo = cIter->second; - - if (classInfo.writeSchemaCall != 0) - { - EncodeHeader (outBuffer, 's', sequence); - classInfo.writeSchemaCall (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - else - { - // TODO: Forward request to remote agent. - } - - clientAdded (); - // TODO: Send client-added to each remote agent. - } - } -} - -uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/) -{ - // TODO: Allow remote agents to keep their requested prefixes if able. - return nextRemotePrefix++; -} - -void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string label; - uint32_t requestedPrefix; - uint32_t assignedPrefix; - - inBuffer.getShortString (label); - requestedPrefix = inBuffer.getLong (); - assignedPrefix = assignPrefix (requestedPrefix); - - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'a', sequence); - outBuffer.putLong (assignedPrefix); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - FieldTable ft; - FieldTable::ValuePtr value; - - ft.decode (inBuffer); - value = ft.get ("_class"); - if (value->empty () || !value->convertsTo<string> ()) - { - // TODO: Send completion with an error code - return; - } - - string className (value->get<string> ()); - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = iter->second; - if (object->getClassName () == className) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'g', sequence); - object->writeConfig (outBuffer); - object->writeInstrumentation (outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementAgent::dispatchAgentCommand (Message& msg) -{ - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) - { - const framing::ReplyTo& rt = p->getReplyTo (); - replyToKey = rt.getRoutingKey (); - } - else - return; - - msg.encodeContent (inBuffer); - inBuffer.reset (); - - if (!CheckHeader (inBuffer, &opcode, &sequence)) - return; - - if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence); - else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence); -} - -ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name) -{ - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) - return pIter; - - // No such package found, create a new map entry. - pair<PackageMap::iterator, bool> result = - packages.insert (pair<string, ClassMap> (name, ClassMap ())); - QPID_LOG (debug, "ManagementAgent added package " << name); - - // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); - - return result.first; -} - -void ManagementAgent::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - SchemaClassKey key; - ClassMap& cMap = pIter->second; - - key.name = className; - memcpy (&key.hash, md5Sum, 16); - - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) - return; - - // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." << - key.name); - SchemaClass classInfo; - - classInfo.writeSchemaCall = schemaCall; - cMap[key] = classInfo; - - // TODO: Publish a class-indication message -} - -void ManagementAgent::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) -{ - buf.putShortString ((*pIter).first); -} - -void ManagementAgent::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) -{ - SchemaClassKey key = (*cIter).first; - - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); -} - diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 4cd679a035..c38e273c49 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -22,168 +22,28 @@ * */ -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Mutex.h" #include "ManagementObject.h" -#include <qpid/framing/AMQFrame.h> -#include <boost/shared_ptr.hpp> namespace qpid { namespace management { class ManagementAgent { - private: - - ManagementAgent (std::string dataDir, uint16_t interval); - public: - virtual ~ManagementAgent (); + virtual ~ManagementAgent () {} typedef boost::shared_ptr<ManagementAgent> shared_ptr; - static void enableManagement (std::string dataDir, uint16_t interval); static shared_ptr getAgent (void); - static void shutdown (void); - - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (broker::Exchange::shared_ptr mgmtExchange, - broker::Exchange::shared_ptr directExchange); - void RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void addObject (ManagementObject::shared_ptr object, - uint32_t persistId = 0, - uint32_t persistBank = 2); - void clientAdded (void); - void dispatchCommand (broker::Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); - - private: - - struct Periodic : public broker::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - - // Storage for tracking remote management agents, attached via the client - // management agent API. - // - struct RemoteAgent - { - std::string name; - uint64_t objIdBase; - }; - - // TODO: Eventually replace string with entire reply-to structure. reply-to - // currently assumes that the exchange is "amq.direct" even though it could - // in theory be specified differently. - typedef std::map<std::string, RemoteAgent> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; - - // Storage for known schema classes: - // - // SchemaClassKey -- Key elements for map lookups - // SchemaClassKeyComp -- Comparison class for SchemaClassKey - // SchemaClass -- Non-key elements for classes - // - struct SchemaClassKey - { - std::string name; - uint8_t hash[16]; - }; - - struct SchemaClassKeyComp - { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - struct SchemaClass - { - ManagementObject::writeSchemaCall_t writeSchemaCall; - ReplyToVector remoteAgents; - - SchemaClass () : writeSchemaCall(0) {} - }; - - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; - typedef std::map<std::string, ClassMap> PackageMap; - - RemoteAgentMap remoteAgents; - PackageMap packages; - ManagementObjectMap managementObjects; - - static shared_ptr agent; - static bool enabled; - - qpid::framing::Uuid uuid; - qpid::sys::Mutex userLock; - broker::Timer timer; - broker::Exchange::shared_ptr mExchange; - broker::Exchange::shared_ptr dExchange; - std::string dataDir; - uint16_t interval; - uint16_t bootSequence; - uint32_t localBank; - uint32_t nextObjectId; - uint32_t nextRemotePrefix; - -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - - void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (qpid::framing::Buffer& buf, - uint32_t length, - broker::Exchange::shared_ptr exchange, - std::string routingKey); - - void dispatchMethod (broker::Message& msg, - const std::string& routingKey, - size_t first); - void dispatchAgentCommand (broker::Message& msg); - PackageMap::iterator FindOrAddPackage (std::string name); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (qpid::framing::Buffer& buf, - PackageMap::iterator pIter); - void EncodeClassIndication (qpid::framing::Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter); - uint32_t assignPrefix (uint32_t requestedPrefix); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + virtual void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) = 0; + virtual void addObject (ManagementObject::shared_ptr object, + uint32_t persistId = 0, + uint32_t persistBank = 4) = 0; }; }} diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp new file mode 100644 index 0000000000..6466028c00 --- /dev/null +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -0,0 +1,746 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementBroker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include "qpid/framing/MessageTransferBody.h" +#include <list> +#include <iostream> +#include <fstream> + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace std; + +ManagementAgent::shared_ptr ManagementBroker::agent; +bool ManagementBroker::enabled = 0; + +ManagementBroker::RemoteAgent::~RemoteAgent () +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} + +ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) : + dataDir (_dataDir), interval (_interval), broker (_broker) +{ + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); + localBank = 5; + nextObjectId = 1; + bootSequence = 1; + nextRemoteBank = 10; + + // Get from file or generate and save to file. + if (dataDir.empty ()) + { + uuid.generate (); + QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename (dataDir + "/.mbrokerdata"); + ifstream inFile (filename.c_str ()); + + if (inFile.good ()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close (); + QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + + bootSequence++; + writeData (); + } + else + { + uuid.generate (); + QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); + writeData (); + } + + QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); + } +} + +ManagementBroker::~ManagementBroker () {} + +void ManagementBroker::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good ()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close (); + } +} + +void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker) +{ + enabled = 1; + if (agent.get () == 0) + agent = shared_ptr (new ManagementBroker (dataDir, interval, broker)); +} + +ManagementAgent::shared_ptr ManagementAgent::getAgent (void) +{ + return ManagementBroker::agent; +} + +void ManagementBroker::shutdown (void) +{ + if (agent.get () != 0) + { + ManagementBroker* broker = (ManagementBroker*) agent.get(); + + broker->mExchange.reset (); + broker->dExchange.reset (); + agent.reset (); + } +} + +void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange, + broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementBroker::RegisterClass (string packageName, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock (userLock); + PackageMap::iterator pIter = FindOrAddPackage (packageName); + AddClassLocal (pIter, className, md5Sum, schemaCall); +} + +void ManagementBroker::addObject (ManagementObject::shared_ptr object, + uint32_t persistId, + uint32_t persistBank) +{ + Mutex::ScopedLock lock (userLock); + uint64_t objectId; + + if (persistId == 0) + objectId = ((uint64_t) bootSequence) << 48 | + ((uint64_t) localBank) << 24 | nextObjectId++; + else + objectId = ((uint64_t) persistBank) << 24 | persistId; + + object->setObjectId (objectId); + managementObjects[objectId] = object; +} + +ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), broker(_broker) {} + +ManagementBroker::Periodic::~Periodic () {} + +void ManagementBroker::Periodic::fire () +{ + broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); + broker.PeriodicProcessing (); +} + +void ManagementBroker::clientAdded (void) +{ + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + object->setAllChanged (); + } +} + +void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + + *opcode = buf.getOctet (); + *seq = buf.getLong (); + + return h1 == 'A' && h2 == 'M' && h3 == '1'; +} + +void ManagementBroker::SendBuffer (Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + string routingKey) +{ + if (exchange.get() == 0) + return; + + intrusive_ptr<Message> msg (new Message ()); + AMQFrame method (in_place<MessageTransferBody>( + ProtocolVersion(), exchange->getName (), 0, 0)); + AMQFrame header (in_place<AMQHeaderBody>()); + AMQFrame content(in_place<AMQContentBody>()); + + content.castBody<AMQContentBody>()->decode(buf, length); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, routingKey, 0); +} + +void ManagementBroker::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + std::list<uint64_t> deleteList; + + if (managementObjects.empty ()) + return; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + + if (object->getConfigChanged () || object->isDeleted ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'c'); + object->writeConfig (msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->getInstChanged ()) + { + Buffer msgBuffer (msgChars, BUFSIZE); + EncodeHeader (msgBuffer, 'i'); + object->writeInstrumentation (msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName (); + SendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + } + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + managementObjects.erase (*iter); + + deleteList.clear (); +} + +void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + Mutex::ScopedLock lock (userLock); + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + if (routingKey.compare (0, 13, "agent.method.") == 0) + dispatchMethodLH (msg, routingKey, 13); + + else if (routingKey.length () == 5 && + routingKey.compare (0, 5, "agent") == 0) + dispatchAgentCommandLH (msg); + + else + { + QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); + return; + } +} + +void ManagementBroker::dispatchMethodLH (Message& msg, + const string& routingKey, + size_t first) +{ + size_t pos, start = first; + uint32_t contentSize; + + if (routingKey.length () == start) + { + QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); + return; + } + + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); + return; + } + + string packageName = routingKey.substr (start, pos - start); + + start = pos + 1; + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); + return; + } + + string className = routingKey.substr (start, pos - start); + + start = pos + 1; + string methodName = routingKey.substr (start, routingKey.length () - start); + + contentSize = msg.encodedContentSize (); + if (contentSize < 8 || contentSize > MA_BUFFER_SIZE) + return; + + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen, sequence; + uint8_t opcode; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &sequence)) + { + QPID_LOG (debug, " Invalid content header"); + return; + } + + if (opcode != 'M') + { + QPID_LOG (debug, " Unexpected opcode " << opcode); + return; + } + + uint64_t objId = inBuffer.getLongLong (); + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + { + QPID_LOG (debug, " Reply-to missing"); + return; + } + + EncodeHeader (outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find (objId); + if (iter == managementObjects.end () || iter->second->isDeleted ()) + { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } + else + { + iter->second->doMethod (methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p', sequence); + EncodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + FindOrAddPackage (packageName); +} + +void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + std::string packageName; + + inBuffer.getShortString (packageName); + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin (); + cIter != cMap.end (); + cIter++) + { + if (cIter->second.hasSchema ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'q', sequence); + EncodeClassIndication (outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::SchemaClass::appendSchema (Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) + writeSchemaCall (buf); + else + buf.putRawData (buffer, bufferLen); +} + +void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find (packageName); + if (pIter != packages.end ()) + { + ClassMap cMap = pIter->second; + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass classInfo = cIter->second; + + if (classInfo.hasSchema()) + { + EncodeHeader (outBuffer, 's', sequence); + classInfo.appendSchema (outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + clientAdded (); + // TODO: Send client-added to each remote agent. + } + } +} + +bool ManagementBroker::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->objIdBank == bank) + return true; + return false; +} + +uint32_t ManagementBroker::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string label; + uint32_t requestedBank; + uint32_t assignedBank; + Uuid sessionId; + Uuid systemId; + + inBuffer.getShortString (label); + sessionId.decode (inBuffer); + systemId.decode (inBuffer); + requestedBank = inBuffer.getLong (); + assignedBank = assignBankLH (requestedBank); + + RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId); + if (aIter != remoteAgents.end()) + { + // There already exists an agent on this session. Reject the request. + sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent"); + return; + } + + RemoteAgent* agent = new RemoteAgent; + agent->objIdBank = assignedBank; + agent->mgmtObject = management::Agent::shared_ptr + (new management::Agent (agent)); + agent->mgmtObject->set_sessionId (sessionId); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_sysId (systemId); + addObject (agent->mgmtObject); + + remoteAgents[sessionId] = agent; + + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (assignedBank); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + ft.decode (inBuffer); + value = ft.get ("_class"); + if (value->empty () || !value->convertsTo<string> ()) + { + // TODO: Send completion with an error code + return; + } + + string className (value->get<string> ()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'g', sequence); + object->writeConfig (outBuffer); + object->writeInstrumentation (outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementBroker::dispatchAgentCommandLH (Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + return; + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + if (!CheckHeader (inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); +} + +ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair<PackageMap::iterator, bool> result = + packages.insert (pair<string, ClassMap> (name, ClassMap ())); + QPID_LOG (debug, "ManagementBroker added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader (outBuffer, 'p'); + EncodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package"); + + return result.first; +} + +void ManagementBroker::AddClassLocal (PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy (&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find (key); + if (cIter != cMap.end ()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." << + key.name); + SchemaClass classInfo; + + classInfo.writeSchemaCall = schemaCall; + cMap[key] = classInfo; + + // TODO: Publish a class-indication message +} + +void ManagementBroker::EncodePackageIndication (Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString ((*pIter).first); +} + +void ManagementBroker::EncodeClassIndication (Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putShortString ((*pIter).first); + buf.putShortString (key.name); + buf.putBin128 (key.hash); +} + diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h new file mode 100644 index 0000000000..2e02cb2a43 --- /dev/null +++ b/cpp/src/qpid/management/ManagementBroker.h @@ -0,0 +1,203 @@ +#ifndef _ManagementBroker_ +#define _ManagementBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "ManagementAgent.h" +#include "ManagementObject.h" +#include "Manageable.h" +#include "qpid/management/Agent.h" +#include <qpid/framing/AMQFrame.h> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace management { + +class ManagementBroker : public ManagementAgent +{ + private: + + ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker); + + public: + + virtual ~ManagementBroker (); + + static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker); + static shared_ptr getAgent (void); + static void shutdown (void); + + void setInterval (uint16_t _interval) { interval = _interval; } + void setExchange (broker::Exchange::shared_ptr mgmtExchange, + broker::Exchange::shared_ptr directExchange); + void RegisterClass (std::string packageName, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void addObject (ManagementObject::shared_ptr object, + uint32_t persistId = 0, + uint32_t persistBank = 4); + void clientAdded (void); + void dispatchCommand (broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args); + + private: + friend class ManagementAgent; + + struct Periodic : public broker::TimerTask + { + ManagementBroker& broker; + + Periodic (ManagementBroker& broker, uint32_t seconds); + virtual ~Periodic (); + void fire (); + }; + + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + uint32_t objIdBank; + Agent::shared_ptr mgmtObject; + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + virtual ~RemoteAgent (); + }; + + // TODO: Eventually replace string with entire reply-to structure. reply-to + // currently assumes that the exchange is "amq.direct" even though it could + // in theory be specified differently. + typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap; + typedef std::vector<std::string> ReplyToVector; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + ManagementObject::writeSchemaCall_t writeSchemaCall; + ReplyToVector remoteAgents; + size_t bufferLen; + uint8_t* buffer; + + SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } + void appendSchema (framing::Buffer& buf); + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + ManagementObjectMap managementObjects; + + static shared_ptr agent; + static bool enabled; + + framing::Uuid uuid; + sys::Mutex userLock; + broker::Timer timer; + broker::Exchange::shared_ptr mExchange; + broker::Exchange::shared_ptr dExchange; + std::string dataDir; + uint16_t interval; + Manageable* broker; + uint16_t bootSequence; + uint32_t localBank; + uint32_t nextObjectId; + uint32_t nextRemoteBank; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; + + void writeData (); + void PeriodicProcessing (void); + void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void SendBuffer (framing::Buffer& buf, + uint32_t length, + broker::Exchange::shared_ptr exchange, + std::string routingKey); + + void dispatchMethodLH (broker::Message& msg, + const std::string& routingKey, + size_t first); + void dispatchAgentCommandLH (broker::Message& msg); + + PackageMap::iterator FindOrAddPackage (std::string name); + void AddClassLocal (PackageMap::iterator pIter, + std::string className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void EncodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void EncodeClassIndication (framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); + void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); +}; + +}} + +#endif /*!_ManagementBroker_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index c589aefba0..28e6fb8d0a 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -53,7 +53,7 @@ void ManagementExchange::route (Deliverable& msg, TopicExchange::route (msg, routingKey, args); } -void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +void ManagementExchange::setManagmentAgent (ManagementBroker* agent) { managementAgent = agent; } diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 7faec32b0f..28066b1e80 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -22,7 +22,7 @@ #define _ManagementExchange_ #include "qpid/broker/TopicExchange.h" -#include "ManagementAgent.h" +#include "ManagementBroker.h" namespace qpid { namespace broker { @@ -30,7 +30,7 @@ namespace broker { class ManagementExchange : public virtual TopicExchange { private: - management::ManagementAgent::shared_ptr managementAgent; + management::ManagementBroker* managementAgent; public: static const std::string typeName; @@ -46,7 +46,7 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementAgent::shared_ptr agent); + void setManagmentAgent (management::ManagementBroker* agent); virtual ~ManagementExchange(); }; diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 48a3372d16..047f8c5754 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -22,7 +22,6 @@ * */ -#include "Manageable.h" #include "qpid/sys/Time.h" #include "qpid/sys/Mutex.h" #include <qpid/framing/Buffer.h> |
