summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
committerGordon Sim <gsim@apache.org>2008-05-12 17:04:07 +0000
commit0655ff5aceb9d53eb256a05d7beb55b1c803c8de (patch)
treed478a719d5a5d030c3e228d298c6be8378d4fe44 /cpp/src/qpid/management
parent4a1605e6b357c251398aca281b90452c1cbd5ab2 (diff)
downloadqpid-python-0655ff5aceb9d53eb256a05d7beb55b1c803c8de.tar.gz
QPID-1050: Patch from Ted Ross:
1) Durability for federation links (broker-to-broker connections) 2) Improved handling of federation links: a) Links can be created even if the remote broker is not reachable b) If links are lost, re-establishment will occur using an exponential back-off algorithm 3) Durability of exchanges is now viewable through management 4) ManagementAgent API has been moved to an interface class to reduce coupling between the broker and manageable plug-ins. 5) General configuration storage capability has been added to the store/recover interface. This is used for federation links. 6) Management object-ids for durable objects are now themselves durable. (Note: some refactoring needed around ProtocolAccess needed to try and reduce dependencies) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@655563 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/Manageable.cpp16
-rw-r--r--cpp/src/qpid/management/Manageable.h13
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp695
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h156
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp746
-rw-r--r--cpp/src/qpid/management/ManagementBroker.h203
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp2
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h6
-rw-r--r--cpp/src/qpid/management/ManagementObject.h1
9 files changed, 979 insertions, 859 deletions
diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp
index 479cb4e0ce..0f3fbab55c 100644
--- a/cpp/src/qpid/management/Manageable.cpp
+++ b/cpp/src/qpid/management/Manageable.cpp
@@ -25,13 +25,19 @@ std::string Manageable::StatusText (status_t status)
{
switch (status)
{
- case STATUS_OK : return "OK";
- case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
- case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
- case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
- case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_OK : return "OK";
+ case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
+ case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
+ case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
+ case STATUS_INVALID_PARAMETER : return "InvalidParameter";
+ case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
}
return "??";
}
+Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&)
+{
+ return STATUS_UNKNOWN_METHOD;
+}
+
diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h
index 836ba03b23..25c24588fc 100644
--- a/cpp/src/qpid/management/Manageable.h
+++ b/cpp/src/qpid/management/Manageable.h
@@ -39,11 +39,12 @@ class Manageable
typedef uint32_t status_t;
static std::string StatusText (status_t status);
- static const status_t STATUS_OK = 0;
- static const status_t STATUS_UNKNOWN_OBJECT = 1;
- static const status_t STATUS_UNKNOWN_METHOD = 2;
- static const status_t STATUS_NOT_IMPLEMENTED = 3;
- static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_OK = 0;
+ static const status_t STATUS_UNKNOWN_OBJECT = 1;
+ static const status_t STATUS_UNKNOWN_METHOD = 2;
+ static const status_t STATUS_NOT_IMPLEMENTED = 3;
+ static const status_t STATUS_INVALID_PARAMETER = 4;
+ static const status_t STATUS_FEATURE_NOT_IMPLEMENTED = 5;
// Every "Manageable" object must hold a reference to exactly one
// management object. This object is always of a class derived from
@@ -58,7 +59,7 @@ class Manageable
// on this object. The input and output arguments are specific to the
// method being called and must be down-cast to the appropriate sub class
// before use.
- virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+ virtual status_t ManagementMethod (uint32_t methodId, Args& args);
};
inline Manageable::~Manageable (void) {}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 9b4290232d..e69de29bb2 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -1,695 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ManagementAgent.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
-#include <qpid/broker/MessageDelivery.h>
-#include "qpid/framing/MessageTransferBody.h"
-#include <list>
-#include <iostream>
-#include <fstream>
-
-using boost::intrusive_ptr;
-using namespace qpid::framing;
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace std;
-
-ManagementAgent::shared_ptr ManagementAgent::agent;
-bool ManagementAgent::enabled = 0;
-
-ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
- dataDir (_dataDir), interval (_interval)
-{
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
- localBank = 3;
- nextObjectId = 1;
- nextRemotePrefix = 101;
-
- // Get from file or generate and save to file.
- if (dataDir.empty ())
- {
- uuid.generate ();
- bootSequence = 1;
- QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
- << uuid);
- }
- else
- {
- string filename (dataDir + "/brokerId");
- string seqFilename (dataDir + "/bootseq");
- ifstream inFile (filename.c_str ());
- ifstream seqFile (seqFilename.c_str ());
-
- if (inFile.good ())
- {
- inFile >> uuid;
- inFile.close ();
- QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
- }
- else
- {
- uuid.generate ();
- QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
-
- ofstream outFile (filename.c_str ());
- if (outFile.good ())
- {
- outFile << uuid << endl;
- outFile.close ();
- QPID_LOG (debug, "ManagementAgent saved broker ID");
- }
- else
- {
- QPID_LOG (warning, "ManagementAgent unable to save broker ID");
- }
- }
-
- if (seqFile.good ())
- {
- seqFile >> bootSequence;
- seqFile.close ();
- }
- else
- bootSequence = 1;
-
- ofstream seqOut (seqFilename.c_str ());
- if (seqOut.good ())
- {
- uint16_t nextSeq = (bootSequence + 1) & 0x7FFF;
- if (nextSeq == 0)
- nextSeq = 1;
- seqOut << nextSeq << endl;
- seqOut.close ();
- }
-
- QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
- }
-}
-
-ManagementAgent::~ManagementAgent () {}
-
-void ManagementAgent::enableManagement (string dataDir, uint16_t interval)
-{
- enabled = 1;
- if (agent.get () == 0)
- agent = shared_ptr (new ManagementAgent (dataDir, interval));
-}
-
-ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
-{
- return agent;
-}
-
-void ManagementAgent::shutdown (void)
-{
- if (agent.get () != 0)
- {
- agent->mExchange.reset ();
- agent->dExchange.reset ();
- agent.reset ();
- }
-}
-
-void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
- broker::Exchange::shared_ptr _dexchange)
-{
- mExchange = _mexchange;
- dExchange = _dexchange;
-}
-
-void ManagementAgent::RegisterClass (string packageName,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock (userLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
-}
-
-void ManagementAgent::addObject (ManagementObject::shared_ptr object,
- uint32_t persistId,
- uint32_t persistBank)
-{
- Mutex::ScopedLock lock (userLock);
- uint64_t objectId;
-
- if (persistId == 0)
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
-
- object->setObjectId (objectId);
- managementObjects[objectId] = object;
-}
-
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic () {}
-
-void ManagementAgent::Periodic::fire ()
-{
- agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
- agent.PeriodicProcessing ();
-}
-
-void ManagementAgent::clientAdded (void)
-{
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- object->setAllChanged ();
- }
-}
-
-void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
-}
-
-bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
-
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
-
- return h1 == 'A' && h2 == 'M' && h3 == '1';
-}
-
-void ManagementAgent::SendBuffer (Buffer& buf,
- uint32_t length,
- broker::Exchange::shared_ptr exchange,
- string routingKey)
-{
- if (exchange.get() == 0)
- return;
-
- intrusive_ptr<Message> msg (new Message ());
- AMQFrame method (in_place<MessageTransferBody>(
- ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header (in_place<AMQHeaderBody>());
- AMQFrame content(in_place<AMQContentBody>());
-
- content.castBody<AMQContentBody>()->decode(buf, length);
-
- method.setEof (false);
- header.setBof (false);
- header.setEof (false);
- content.setBof (false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(length);
- msg->getFrames().append(content);
-
- DeliverableMessage deliverable (msg);
- exchange->route (deliverable, routingKey, 0);
-}
-
-void ManagementAgent::PeriodicProcessing (void)
-{
-#define BUFSIZE 65536
- Mutex::ScopedLock lock (userLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
- string routingKey;
- std::list<uint64_t> deleteList;
-
- if (managementObjects.empty ())
- return;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
-
- if (object->getConfigChanged () || object->isDeleted ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
- object->writeConfig (msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->getInstChanged ())
- {
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
- object->writeInstrumentation (msgBuffer);
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
- }
-
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
- }
-
- // Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
- iter++)
- managementObjects.erase (*iter);
-
- deleteList.clear ();
-}
-
-void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
- uint32_t code, string text)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'z', sequence);
- outBuffer.putLong (code);
- outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::dispatchCommand (Deliverable& deliverable,
- const string& routingKey,
- const FieldTable* /*args*/)
-{
- Mutex::ScopedLock lock (userLock);
- Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
-
- if (routingKey.compare (0, 13, "agent.method.") == 0)
- dispatchMethod (msg, routingKey, 13);
-
- else if (routingKey.length () == 5 &&
- routingKey.compare (0, 5, "agent") == 0)
- dispatchAgentCommand (msg);
-
- else
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-}
-
-void ManagementAgent::dispatchMethod (Message& msg,
- const string& routingKey,
- size_t first)
-{
- size_t pos, start = first;
- uint32_t contentSize;
-
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
- return;
- }
-
- string packageName = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
- return;
- }
-
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
- return;
-
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen, sequence;
- uint8_t opcode;
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- {
- QPID_LOG (debug, " Invalid content header");
- return;
- }
-
- if (opcode != 'M')
- {
- QPID_LOG (debug, " Unexpected opcode " << opcode);
- return;
- }
-
- uint64_t objId = inBuffer.getLongLong ();
- string replyToKey;
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- {
- QPID_LOG (debug, " Reply-to missing");
- return;
- }
-
- EncodeHeader (outBuffer, 'm', sequence);
-
- ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end () || iter->second->isDeleted ())
- {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- }
- else
- {
- iter->second->doMethod (methodName, inBuffer, outBuffer);
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'b', sequence);
- uuid.encode (outBuffer);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence)
-{
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
-{
- std::string packageName;
-
- inBuffer.getShortString (packageName);
- FindOrAddPackage (packageName);
-}
-
-void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- std::string packageName;
-
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
- cIter++)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
-
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
- {
- ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- SchemaClass classInfo = cIter->second;
-
- if (classInfo.writeSchemaCall != 0)
- {
- EncodeHeader (outBuffer, 's', sequence);
- classInfo.writeSchemaCall (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- else
- {
- // TODO: Forward request to remote agent.
- }
-
- clientAdded ();
- // TODO: Send client-added to each remote agent.
- }
- }
-}
-
-uint32_t ManagementAgent::assignPrefix (uint32_t /*requestedPrefix*/)
-{
- // TODO: Allow remote agents to keep their requested prefixes if able.
- return nextRemotePrefix++;
-}
-
-void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- string label;
- uint32_t requestedPrefix;
- uint32_t assignedPrefix;
-
- inBuffer.getShortString (label);
- requestedPrefix = inBuffer.getLong ();
- assignedPrefix = assignPrefix (requestedPrefix);
-
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'a', sequence);
- outBuffer.putLong (assignedPrefix);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
-}
-
-void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
-{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
- ft.decode (inBuffer);
- value = ft.get ("_class");
- if (value->empty () || !value->convertsTo<string> ())
- {
- // TODO: Send completion with an error code
- return;
- }
-
- string className (value->get<string> ());
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++)
- {
- ManagementObject::shared_ptr object = iter->second;
- if (object->getClassName () == className)
- {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'g', sequence);
- object->writeConfig (outBuffer);
- object->writeInstrumentation (outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- }
- }
-
- sendCommandComplete (replyToKey, sequence);
-}
-
-void ManagementAgent::dispatchAgentCommand (Message& msg)
-{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- return;
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- return;
-
- if (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageInd (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaQuery (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence);
- else if (opcode == 'G') handleGetRequest (inBuffer, replyToKey, sequence);
-}
-
-ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
-{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
- return pIter;
-
- // No such package found, create a new map entry.
- pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
- QPID_LOG (debug, "ManagementAgent added package " << name);
-
- // Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
-
- return result.first;
-}
-
-void ManagementAgent::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- SchemaClassKey key;
- ClassMap& cMap = pIter->second;
-
- key.name = className;
- memcpy (&key.hash, md5Sum, 16);
-
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
- return;
-
- // No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementAgent added class " << pIter->first << "." <<
- key.name);
- SchemaClass classInfo;
-
- classInfo.writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
-}
-
-void ManagementAgent::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
-{
- buf.putShortString ((*pIter).first);
-}
-
-void ManagementAgent::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
-{
- SchemaClassKey key = (*cIter).first;
-
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
-}
-
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index 4cd679a035..c38e273c49 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -22,168 +22,28 @@
*
*/
-#include "qpid/Options.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/sys/Mutex.h"
#include "ManagementObject.h"
-#include <qpid/framing/AMQFrame.h>
-#include <boost/shared_ptr.hpp>
namespace qpid {
namespace management {
class ManagementAgent
{
- private:
-
- ManagementAgent (std::string dataDir, uint16_t interval);
-
public:
- virtual ~ManagementAgent ();
+ virtual ~ManagementAgent () {}
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
- static void enableManagement (std::string dataDir, uint16_t interval);
static shared_ptr getAgent (void);
- static void shutdown (void);
-
- void setInterval (uint16_t _interval) { interval = _interval; }
- void setExchange (broker::Exchange::shared_ptr mgmtExchange,
- broker::Exchange::shared_ptr directExchange);
- void RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void addObject (ManagementObject::shared_ptr object,
- uint32_t persistId = 0,
- uint32_t persistBank = 2);
- void clientAdded (void);
- void dispatchCommand (broker::Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
-
- private:
-
- struct Periodic : public broker::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
- // Storage for tracking remote management agents, attached via the client
- // management agent API.
- //
- struct RemoteAgent
- {
- std::string name;
- uint64_t objIdBase;
- };
-
- // TODO: Eventually replace string with entire reply-to structure. reply-to
- // currently assumes that the exchange is "amq.direct" even though it could
- // in theory be specified differently.
- typedef std::map<std::string, RemoteAgent> RemoteAgentMap;
- typedef std::vector<std::string> ReplyToVector;
-
- // Storage for known schema classes:
- //
- // SchemaClassKey -- Key elements for map lookups
- // SchemaClassKeyComp -- Comparison class for SchemaClassKey
- // SchemaClass -- Non-key elements for classes
- //
- struct SchemaClassKey
- {
- std::string name;
- uint8_t hash[16];
- };
-
- struct SchemaClassKeyComp
- {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- struct SchemaClass
- {
- ManagementObject::writeSchemaCall_t writeSchemaCall;
- ReplyToVector remoteAgents;
-
- SchemaClass () : writeSchemaCall(0) {}
- };
-
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
- typedef std::map<std::string, ClassMap> PackageMap;
-
- RemoteAgentMap remoteAgents;
- PackageMap packages;
- ManagementObjectMap managementObjects;
-
- static shared_ptr agent;
- static bool enabled;
-
- qpid::framing::Uuid uuid;
- qpid::sys::Mutex userLock;
- broker::Timer timer;
- broker::Exchange::shared_ptr mExchange;
- broker::Exchange::shared_ptr dExchange;
- std::string dataDir;
- uint16_t interval;
- uint16_t bootSequence;
- uint32_t localBank;
- uint32_t nextObjectId;
- uint32_t nextRemotePrefix;
-
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
-
- void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (qpid::framing::Buffer& buf,
- uint32_t length,
- broker::Exchange::shared_ptr exchange,
- std::string routingKey);
-
- void dispatchMethod (broker::Message& msg,
- const std::string& routingKey,
- size_t first);
- void dispatchAgentCommand (broker::Message& msg);
- PackageMap::iterator FindOrAddPackage (std::string name);
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (qpid::framing::Buffer& buf,
- PackageMap::iterator pIter);
- void EncodeClassIndication (qpid::framing::Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter);
- uint32_t assignPrefix (uint32_t requestedPrefix);
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handlePackageInd (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleClassQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleSchemaQuery (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleGetRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ virtual void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall) = 0;
+ virtual void addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4) = 0;
};
}}
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
new file mode 100644
index 0000000000..6466028c00
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -0,0 +1,746 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementBroker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include "qpid/framing/MessageTransferBody.h"
+#include <list>
+#include <iostream>
+#include <fstream>
+
+using boost::intrusive_ptr;
+using qpid::framing::Uuid;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace std;
+
+ManagementAgent::shared_ptr ManagementBroker::agent;
+bool ManagementBroker::enabled = 0;
+
+ManagementBroker::RemoteAgent::~RemoteAgent ()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
+
+ManagementBroker::ManagementBroker (string _dataDir, uint16_t _interval, Manageable* _broker) :
+ dataDir (_dataDir), interval (_interval), broker (_broker)
+{
+ timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
+ localBank = 5;
+ nextObjectId = 1;
+ bootSequence = 1;
+ nextRemoteBank = 10;
+
+ // Get from file or generate and save to file.
+ if (dataDir.empty ())
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
+ << uuid);
+ }
+ else
+ {
+ string filename (dataDir + "/.mbrokerdata");
+ ifstream inFile (filename.c_str ());
+
+ if (inFile.good ())
+ {
+ inFile >> uuid;
+ inFile >> bootSequence;
+ inFile >> nextRemoteBank;
+ inFile.close ();
+ QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+
+ bootSequence++;
+ writeData ();
+ }
+ else
+ {
+ uuid.generate ();
+ QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
+ writeData ();
+ }
+
+ QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
+ }
+}
+
+ManagementBroker::~ManagementBroker () {}
+
+void ManagementBroker::writeData ()
+{
+ string filename (dataDir + "/.mbrokerdata");
+ ofstream outFile (filename.c_str ());
+
+ if (outFile.good ())
+ {
+ outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
+ outFile.close ();
+ }
+}
+
+void ManagementBroker::enableManagement (string dataDir, uint16_t interval, Manageable* broker)
+{
+ enabled = 1;
+ if (agent.get () == 0)
+ agent = shared_ptr (new ManagementBroker (dataDir, interval, broker));
+}
+
+ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
+{
+ return ManagementBroker::agent;
+}
+
+void ManagementBroker::shutdown (void)
+{
+ if (agent.get () != 0)
+ {
+ ManagementBroker* broker = (ManagementBroker*) agent.get();
+
+ broker->mExchange.reset ();
+ broker->dExchange.reset ();
+ agent.reset ();
+ }
+}
+
+void ManagementBroker::setExchange (broker::Exchange::shared_ptr _mexchange,
+ broker::Exchange::shared_ptr _dexchange)
+{
+ mExchange = _mexchange;
+ dExchange = _dexchange;
+}
+
+void ManagementBroker::RegisterClass (string packageName,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock (userLock);
+ PackageMap::iterator pIter = FindOrAddPackage (packageName);
+ AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId,
+ uint32_t persistBank)
+{
+ Mutex::ScopedLock lock (userLock);
+ uint64_t objectId;
+
+ if (persistId == 0)
+ objectId = ((uint64_t) bootSequence) << 48 |
+ ((uint64_t) localBank) << 24 | nextObjectId++;
+ else
+ objectId = ((uint64_t) persistBank) << 24 | persistId;
+
+ object->setObjectId (objectId);
+ managementObjects[objectId] = object;
+}
+
+ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), broker(_broker) {}
+
+ManagementBroker::Periodic::~Periodic () {}
+
+void ManagementBroker::Periodic::fire ()
+{
+ broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
+ broker.PeriodicProcessing ();
+}
+
+void ManagementBroker::clientAdded (void)
+{
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ object->setAllChanged ();
+ }
+}
+
+void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+ buf.putOctet ('A');
+ buf.putOctet ('M');
+ buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putLong (seq);
+}
+
+bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *seq = buf.getLong ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '1';
+}
+
+void ManagementBroker::SendBuffer (Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ string routingKey)
+{
+ if (exchange.get() == 0)
+ return;
+
+ intrusive_ptr<Message> msg (new Message ());
+ AMQFrame method (in_place<MessageTransferBody>(
+ ProtocolVersion(), exchange->getName (), 0, 0));
+ AMQFrame header (in_place<AMQHeaderBody>());
+ AMQFrame content(in_place<AMQContentBody>());
+
+ content.castBody<AMQContentBody>()->decode(buf, length);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(length);
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ exchange->route (deliverable, routingKey, 0);
+}
+
+void ManagementBroker::PeriodicProcessing (void)
+{
+#define BUFSIZE 65536
+ Mutex::ScopedLock lock (userLock);
+ char msgChars[BUFSIZE];
+ uint32_t contentSize;
+ string routingKey;
+ std::list<uint64_t> deleteList;
+
+ if (managementObjects.empty ())
+ return;
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+
+ if (object->getConfigChanged () || object->isDeleted ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'c');
+ object->writeConfig (msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
+
+ if (object->getInstChanged ())
+ {
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ EncodeHeader (msgBuffer, 'i');
+ object->writeInstrumentation (msgBuffer);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
+ SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ }
+
+ if (object->isDeleted ())
+ deleteList.push_back (iter->first);
+ }
+
+ // Delete flagged objects
+ for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ managementObjects.erase (*iter);
+
+ deleteList.clear ();
+}
+
+void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'z', sequence);
+ outBuffer.putLong (code);
+ outBuffer.putShortString (text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::dispatchCommand (Deliverable& deliverable,
+ const string& routingKey,
+ const FieldTable* /*args*/)
+{
+ Mutex::ScopedLock lock (userLock);
+ Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
+
+ if (routingKey.compare (0, 13, "agent.method.") == 0)
+ dispatchMethodLH (msg, routingKey, 13);
+
+ else if (routingKey.length () == 5 &&
+ routingKey.compare (0, 5, "agent") == 0)
+ dispatchAgentCommandLH (msg);
+
+ else
+ {
+ QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
+ return;
+ }
+}
+
+void ManagementBroker::dispatchMethodLH (Message& msg,
+ const string& routingKey,
+ size_t first)
+{
+ size_t pos, start = first;
+ uint32_t contentSize;
+
+ if (routingKey.length () == start)
+ {
+ QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
+ return;
+ }
+
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
+ return;
+ }
+
+ string packageName = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
+ return;
+ }
+
+ string className = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ string methodName = routingKey.substr (start, routingKey.length () - start);
+
+ contentSize = msg.encodedContentSize ();
+ if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
+ return;
+
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen, sequence;
+ uint8_t opcode;
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
+ {
+ QPID_LOG (debug, " Invalid content header");
+ return;
+ }
+
+ if (opcode != 'M')
+ {
+ QPID_LOG (debug, " Unexpected opcode " << opcode);
+ return;
+ }
+
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyToKey;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ {
+ QPID_LOG (debug, " Reply-to missing");
+ return;
+ }
+
+ EncodeHeader (outBuffer, 'm', sequence);
+
+ ManagementObjectMap::iterator iter = managementObjects.find (objId);
+ if (iter == managementObjects.end () || iter->second->isDeleted ())
+ {
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ }
+ else
+ {
+ iter->second->doMethod (methodName, inBuffer, outBuffer);
+ }
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
+{
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'b', sequence);
+ uuid.encode (outBuffer);
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
+{
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p', sequence);
+ EncodePackageIndication (outBuffer, pIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ FindOrAddPackage (packageName);
+}
+
+void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ std::string packageName;
+
+ inBuffer.getShortString (packageName);
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin ();
+ cIter != cMap.end ();
+ cIter++)
+ {
+ if (cIter->second.hasSchema ())
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'q', sequence);
+ EncodeClassIndication (outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+{
+ // If the management package is attached locally (embedded in the broker or
+ // linked in via plug-in), call the schema handler directly. If the package
+ // is from a remote management agent, send the stored schema information.
+
+ if (writeSchemaCall != 0)
+ writeSchemaCall (buf);
+ else
+ buf.putRawData (buffer, bufferLen);
+}
+
+void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string packageName;
+ SchemaClassKey key;
+
+ inBuffer.getShortString (packageName);
+ inBuffer.getShortString (key.name);
+ inBuffer.getBin128 (key.hash);
+
+ PackageMap::iterator pIter = packages.find (packageName);
+ if (pIter != packages.end ())
+ {
+ ClassMap cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ SchemaClass classInfo = cIter->second;
+
+ if (classInfo.hasSchema())
+ {
+ EncodeHeader (outBuffer, 's', sequence);
+ classInfo.appendSchema (outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ clientAdded ();
+ // TODO: Send client-added to each remote agent.
+ }
+ }
+}
+
+bool ManagementBroker::bankInUse (uint32_t bank)
+{
+ for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+ aIter != remoteAgents.end();
+ aIter++)
+ if (aIter->second->objIdBank == bank)
+ return true;
+ return false;
+}
+
+uint32_t ManagementBroker::allocateNewBank ()
+{
+ while (bankInUse (nextRemoteBank))
+ nextRemoteBank++;
+
+ uint32_t allocated = nextRemoteBank++;
+ writeData ();
+ return allocated;
+}
+
+uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
+{
+ if (requestedBank == 0 || bankInUse (requestedBank))
+ return allocateNewBank ();
+ return requestedBank;
+}
+
+void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string label;
+ uint32_t requestedBank;
+ uint32_t assignedBank;
+ Uuid sessionId;
+ Uuid systemId;
+
+ inBuffer.getShortString (label);
+ sessionId.decode (inBuffer);
+ systemId.decode (inBuffer);
+ requestedBank = inBuffer.getLong ();
+ assignedBank = assignBankLH (requestedBank);
+
+ RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+ if (aIter != remoteAgents.end())
+ {
+ // There already exists an agent on this session. Reject the request.
+ sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent");
+ return;
+ }
+
+ RemoteAgent* agent = new RemoteAgent;
+ agent->objIdBank = assignedBank;
+ agent->mgmtObject = management::Agent::shared_ptr
+ (new management::Agent (agent));
+ agent->mgmtObject->set_sessionId (sessionId);
+ agent->mgmtObject->set_label (label);
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
+ agent->mgmtObject->set_sysId (systemId);
+ addObject (agent->mgmtObject);
+
+ remoteAgents[sessionId] = agent;
+
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (assignedBank);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
+void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ ft.decode (inBuffer);
+ value = ft.get ("_class");
+ if (value->empty () || !value->convertsTo<string> ())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className (value->get<string> ());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className)
+ {
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'g', sequence);
+ object->writeConfig (outBuffer);
+ object->writeInstrumentation (outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ }
+ }
+
+ sendCommandComplete (replyToKey, sequence);
+}
+
+void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+{
+ Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToKey;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ return;
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ if (!CheckHeader (inBuffer, &opcode, &sequence))
+ return;
+
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+}
+
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+{
+ PackageMap::iterator pIter = packages.find (name);
+ if (pIter != packages.end ())
+ return pIter;
+
+ // No such package found, create a new map entry.
+ pair<PackageMap::iterator, bool> result =
+ packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ QPID_LOG (debug, "ManagementBroker added package " << name);
+
+ // Publish a package-indication message
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader (outBuffer, 'p');
+ EncodePackageIndication (outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outBuffer.reset ();
+ SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+
+ return result.first;
+}
+
+void ManagementBroker::AddClassLocal (PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ SchemaClassKey key;
+ ClassMap& cMap = pIter->second;
+
+ key.name = className;
+ memcpy (&key.hash, md5Sum, 16);
+
+ ClassMap::iterator cIter = cMap.find (key);
+ if (cIter != cMap.end ())
+ return;
+
+ // No such class found, create a new class with local information.
+ QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
+ key.name);
+ SchemaClass classInfo;
+
+ classInfo.writeSchemaCall = schemaCall;
+ cMap[key] = classInfo;
+
+ // TODO: Publish a class-indication message
+}
+
+void ManagementBroker::EncodePackageIndication (Buffer& buf,
+ PackageMap::iterator pIter)
+{
+ buf.putShortString ((*pIter).first);
+}
+
+void ManagementBroker::EncodeClassIndication (Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
+{
+ SchemaClassKey key = (*cIter).first;
+
+ buf.putShortString ((*pIter).first);
+ buf.putShortString (key.name);
+ buf.putBin128 (key.hash);
+}
+
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h
new file mode 100644
index 0000000000..2e02cb2a43
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementBroker.h
@@ -0,0 +1,203 @@
+#ifndef _ManagementBroker_
+#define _ManagementBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "ManagementAgent.h"
+#include "ManagementObject.h"
+#include "Manageable.h"
+#include "qpid/management/Agent.h"
+#include <qpid/framing/AMQFrame.h>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace management {
+
+class ManagementBroker : public ManagementAgent
+{
+ private:
+
+ ManagementBroker (std::string dataDir, uint16_t interval, Manageable* broker);
+
+ public:
+
+ virtual ~ManagementBroker ();
+
+ static void enableManagement (std::string dataDir, uint16_t interval, Manageable* broker);
+ static shared_ptr getAgent (void);
+ static void shutdown (void);
+
+ void setInterval (uint16_t _interval) { interval = _interval; }
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
+ void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void addObject (ManagementObject::shared_ptr object,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4);
+ void clientAdded (void);
+ void dispatchCommand (broker::Deliverable& msg,
+ const std::string& routingKey,
+ const framing::FieldTable* args);
+
+ private:
+ friend class ManagementAgent;
+
+ struct Periodic : public broker::TimerTask
+ {
+ ManagementBroker& broker;
+
+ Periodic (ManagementBroker& broker, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+ };
+
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent : public Manageable
+ {
+ uint32_t objIdBank;
+ Agent::shared_ptr mgmtObject;
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+ virtual ~RemoteAgent ();
+ };
+
+ // TODO: Eventually replace string with entire reply-to structure. reply-to
+ // currently assumes that the exchange is "amq.direct" even though it could
+ // in theory be specified differently.
+ typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ ReplyToVector remoteAgents;
+ size_t bufferLen;
+ uint8_t* buffer;
+
+ SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
+ void appendSchema (framing::Buffer& buf);
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+ ManagementObjectMap managementObjects;
+
+ static shared_ptr agent;
+ static bool enabled;
+
+ framing::Uuid uuid;
+ sys::Mutex userLock;
+ broker::Timer timer;
+ broker::Exchange::shared_ptr mExchange;
+ broker::Exchange::shared_ptr dExchange;
+ std::string dataDir;
+ uint16_t interval;
+ Manageable* broker;
+ uint16_t bootSequence;
+ uint32_t localBank;
+ uint32_t nextObjectId;
+ uint32_t nextRemoteBank;
+
+# define MA_BUFFER_SIZE 65536
+ char inputBuffer[MA_BUFFER_SIZE];
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ void writeData ();
+ void PeriodicProcessing (void);
+ void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void SendBuffer (framing::Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
+
+ void dispatchMethodLH (broker::Message& msg,
+ const std::string& routingKey,
+ size_t first);
+ void dispatchAgentCommandLH (broker::Message& msg);
+
+ PackageMap::iterator FindOrAddPackage (std::string name);
+ void AddClassLocal (PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void EncodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ bool bankInUse (uint32_t bank);
+ uint32_t allocateNewBank ();
+ uint32_t assignBankLH (uint32_t requestedPrefix);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+};
+
+}}
+
+#endif /*!_ManagementBroker_*/
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index c589aefba0..28e6fb8d0a 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -53,7 +53,7 @@ void ManagementExchange::route (Deliverable& msg,
TopicExchange::route (msg, routingKey, args);
}
-void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
{
managementAgent = agent;
}
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 7faec32b0f..28066b1e80 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -22,7 +22,7 @@
#define _ManagementExchange_
#include "qpid/broker/TopicExchange.h"
-#include "ManagementAgent.h"
+#include "ManagementBroker.h"
namespace qpid {
namespace broker {
@@ -30,7 +30,7 @@ namespace broker {
class ManagementExchange : public virtual TopicExchange
{
private:
- management::ManagementAgent::shared_ptr managementAgent;
+ management::ManagementBroker* managementAgent;
public:
static const std::string typeName;
@@ -46,7 +46,7 @@ class ManagementExchange : public virtual TopicExchange
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementAgent::shared_ptr agent);
+ void setManagmentAgent (management::ManagementBroker* agent);
virtual ~ManagementExchange();
};
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 48a3372d16..047f8c5754 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -22,7 +22,6 @@
*
*/
-#include "Manageable.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Mutex.h"
#include <qpid/framing/Buffer.h>