summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-05-11 14:16:52 +0000
committerTed Ross <tross@apache.org>2009-05-11 14:16:52 +0000
commita1b440e5393206ec5833e2d6c2617c2aca71701f (patch)
treeedbe7aad7a01122986380860c4cedd95086a282a /cpp/src/qpid/management
parentec0e348d1d14679f72ce704555dd2605880bddfa (diff)
downloadqpid-python-a1b440e5393206ec5833e2d6c2617c2aca71701f.tar.gz
QPID-1843 - Cleaned up the interface to the broker's internal management agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@773570 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp (renamed from cpp/src/qpid/management/ManagementBroker.cpp)152
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h (renamed from cpp/src/qpid/management/ManagementBroker.h)68
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp12
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h10
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementObject.h10
6 files changed, 115 insertions, 141 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 19300ef1af..77277070d9 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -19,7 +19,8 @@
*
*/
-#include "ManagementBroker.h"
+#include "ManagementAgent.h"
+#include "ManagementObject.h"
#include "IdAllocator.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/log/Statement.h"
@@ -41,45 +42,13 @@ using namespace qpid::sys;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
-Mutex ManagementAgent::Singleton::lock;
-bool ManagementAgent::Singleton::disabled = false;
-ManagementAgent* ManagementAgent::Singleton::agent = 0;
-int ManagementAgent::Singleton::refCount = 0;
-
-ManagementAgent::Singleton::Singleton(bool disableManagement)
-{
- Mutex::ScopedLock _lock(lock);
- if (disableManagement && !disabled) {
- disabled = true;
- assert(refCount == 0); // can't disable after agent has been allocated
- }
- if (refCount == 0 && !disabled)
- agent = new ManagementBroker();
- refCount++;
-}
-
-ManagementAgent::Singleton::~Singleton()
-{
- Mutex::ScopedLock _lock(lock);
- refCount--;
- if (refCount == 0 && !disabled) {
- delete agent;
- agent = 0;
- }
-}
-
-ManagementAgent* ManagementAgent::Singleton::getInstance()
-{
- return agent;
-}
-
-ManagementBroker::RemoteAgent::~RemoteAgent ()
+ManagementAgent::RemoteAgent::~RemoteAgent ()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy();
}
-ManagementBroker::ManagementBroker () :
+ManagementAgent::ManagementAgent () :
threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
{
nextObjectId = 1;
@@ -90,7 +59,7 @@ ManagementBroker::ManagementBroker () :
clientWasAdded = false;
}
-ManagementBroker::~ManagementBroker ()
+ManagementAgent::~ManagementAgent ()
{
timer.stop();
{
@@ -114,20 +83,21 @@ ManagementBroker::~ManagementBroker ()
}
}
-void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
+void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
broker = _broker;
threadPoolSize = _threads;
+ ManagementObject::maxThreads = threadPoolSize;
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
if (dataDir.empty())
{
uuid.generate();
- QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
+ QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
<< uuid);
}
else
@@ -141,7 +111,7 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
inFile >> bootSequence;
inFile >> nextRemoteBank;
inFile.close();
- QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+ QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
// if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
@@ -152,15 +122,15 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
else
{
uuid.generate();
- QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
+ QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
writeData();
}
- QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
+ QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
}
}
-void ManagementBroker::writeData ()
+void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
ofstream outFile (filename.c_str ());
@@ -172,14 +142,14 @@ void ManagementBroker::writeData ()
}
}
-void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
+void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
qpid::broker::Exchange::shared_ptr _dexchange)
{
mExchange = _mexchange;
dExchange = _dexchange;
}
-void ManagementBroker::registerClass (const string& packageName,
+void ManagementAgent::registerClass (const string& packageName,
const string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
@@ -189,7 +159,7 @@ void ManagementBroker::registerClass (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementBroker::registerEvent (const string& packageName,
+void ManagementAgent::registerEvent (const string& packageName,
const string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
@@ -199,7 +169,7 @@ void ManagementBroker::registerEvent (const string& packageName,
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
-ObjectId ManagementBroker::addObject (ManagementObject* object,
+ObjectId ManagementAgent::addObject (ManagementObject* object,
uint64_t persistId)
{
Mutex::ScopedLock lock (addLock);
@@ -221,7 +191,7 @@ ObjectId ManagementBroker::addObject (ManagementObject* object,
return objId;
}
-void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t severity)
+void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock (userLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
@@ -241,18 +211,18 @@ void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t sever
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
}
-ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {}
-ManagementBroker::Periodic::~Periodic () {}
+ManagementAgent::Periodic::~Periodic () {}
-void ManagementBroker::Periodic::fire ()
+void ManagementAgent::Periodic::fire ()
{
- broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.periodicProcessing ();
+ agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
+ agent.periodicProcessing ();
}
-void ManagementBroker::clientAdded (const std::string& routingKey)
+void ManagementAgent::clientAdded (const std::string& routingKey)
{
if (routingKey.find("console") != 0)
return;
@@ -272,7 +242,7 @@ void ManagementBroker::clientAdded (const std::string& routingKey)
}
}
-void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
@@ -281,7 +251,7 @@ void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
buf.putLong (seq);
}
-bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
uint8_t h1 = buf.getOctet();
uint8_t h2 = buf.getOctet();
@@ -293,7 +263,7 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::sendBuffer(Buffer& buf,
+void ManagementAgent::sendBuffer(Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
@@ -327,7 +297,7 @@ void ManagementBroker::sendBuffer(Buffer& buf,
} catch(exception&) {}
}
-void ManagementBroker::moveNewObjectsLH()
+void ManagementAgent::moveNewObjectsLH()
{
Mutex::ScopedLock lock (addLock);
for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
@@ -337,7 +307,7 @@ void ManagementBroker::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementBroker::periodicProcessing (void)
+void ManagementAgent::periodicProcessing (void)
{
#define BUFSIZE 65536
Mutex::ScopedLock lock (userLock);
@@ -421,7 +391,7 @@ void ManagementBroker::periodicProcessing (void)
}
}
-void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence,
+void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
uint32_t code, string text)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -435,7 +405,7 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
+bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
@@ -471,7 +441,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
return true;
}
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
@@ -532,7 +502,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
@@ -545,7 +515,7 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
{
for (PackageMap::iterator pIter = packages.begin ();
pIter != packages.end ();
@@ -564,7 +534,7 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
{
string packageName;
@@ -572,7 +542,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey
findOrAddPackageLH(packageName);
}
-void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
@@ -601,7 +571,7 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u
sendCommandComplete(replyToKey, sequence);
}
-void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
{
string packageName;
SchemaClassKey key;
@@ -633,7 +603,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui
}
}
-void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
+void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
{
// If the management package is attached locally (embedded in the broker or
// linked in via plug-in), call the schema handler directly. If the package
@@ -645,7 +615,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(buffer, bufferLen);
}
-void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -680,7 +650,7 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey
sendCommandComplete(replyToKey, sequence, 1, "Package not found");
}
-void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -699,7 +669,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
size_t length = validateSchema(inBuffer, cIter->second.kind);
if (length == 0) {
- QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
+ QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
} else {
cIter->second.buffer = (uint8_t*) malloc(length);
@@ -720,7 +690,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo
}
}
-bool ManagementBroker::bankInUse (uint32_t bank)
+bool ManagementAgent::bankInUse (uint32_t bank)
{
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
aIter != remoteAgents.end();
@@ -730,7 +700,7 @@ bool ManagementBroker::bankInUse (uint32_t bank)
return false;
}
-uint32_t ManagementBroker::allocateNewBank ()
+uint32_t ManagementAgent::allocateNewBank ()
{
while (bankInUse (nextRemoteBank))
nextRemoteBank++;
@@ -740,14 +710,14 @@ uint32_t ManagementBroker::allocateNewBank ()
return allocated;
}
-uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank)
+uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank)
{
if (requestedBank == 0 || bankInUse (requestedBank))
return allocateNewBank ();
return requestedBank;
}
-void ManagementBroker::deleteOrphanedAgentsLH()
+void ManagementAgent::deleteOrphanedAgentsLH()
{
vector<ObjectId> deleteList;
@@ -776,7 +746,7 @@ void ManagementBroker::deleteOrphanedAgentsLH()
deleteList.clear();
}
-void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
@@ -827,7 +797,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
FieldTable ft;
FieldTable::ValuePtr value;
@@ -887,7 +857,7 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui
sendCommandComplete(replyToKey, sequence);
}
-bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
+bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
@@ -951,7 +921,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
return true;
}
-void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
@@ -968,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
return;
if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " <<
+ QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.encodedSize());
return;
}
@@ -994,7 +964,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg)
}
}
-ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(string name)
+ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -1003,7 +973,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri
// No such package found, create a new map entry.
pair<PackageMap::iterator, bool> result =
packages.insert(pair<string, ClassMap>(name, ClassMap()));
- QPID_LOG (debug, "ManagementBroker added package " << name);
+ QPID_LOG (debug, "ManagementAgent added package " << name);
// Publish a package-indication message
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1018,7 +988,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri
return result.first;
}
-void ManagementBroker::addClassLH(uint8_t kind,
+void ManagementAgent::addClassLH(uint8_t kind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
@@ -1035,20 +1005,20 @@ void ManagementBroker::addClassLH(uint8_t kind,
return;
// No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" <<
+ QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" <<
key.name);
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
cIter = cMap.find(key);
}
-void ManagementBroker::encodePackageIndication(Buffer& buf,
+void ManagementAgent::encodePackageIndication(Buffer& buf,
PackageMap::iterator pIter)
{
buf.putShortString((*pIter).first);
}
-void ManagementBroker::encodeClassIndication(Buffer& buf,
+void ManagementAgent::encodeClassIndication(Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter)
{
@@ -1060,7 +1030,7 @@ void ManagementBroker::encodeClassIndication(Buffer& buf,
buf.putBin128(key.hash);
}
-size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
{
if (kind == ManagementItem::CLASS_KIND_TABLE)
return validateTableSchema(inBuffer);
@@ -1069,7 +1039,7 @@ size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
return 0;
}
-size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
+size_t ManagementAgent::validateTableSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
@@ -1115,7 +1085,7 @@ size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
return end - start;
}
-size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
+size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
@@ -1147,13 +1117,13 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
return end - start;
}
-void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a)
+void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (addLock);
allocator = a;
}
-uint64_t ManagementBroker::allocateId(Manageable* object)
+uint64_t ManagementAgent::allocateId(Manageable* object)
{
Mutex::ScopedLock lock (addLock);
if (allocator.get()) return allocator->getIdFor(object);
diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementAgent.h
index a57f73be15..2411e6c277 100644
--- a/cpp/src/qpid/management/ManagementBroker.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -1,5 +1,5 @@
-#ifndef _ManagementBroker_
-#define _ManagementBroker_
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
/*
*
@@ -21,14 +21,15 @@
* under the License.
*
*/
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/broker/ConnectionToken.h"
-#include "qpid/agent/ManagementAgent.h"
#include "ManagementObject.h"
+#include "ManagementEvent.h"
#include "Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
#include <qpid/framing/AMQFrame.h>
@@ -39,15 +40,27 @@ namespace management {
struct IdAllocator;
-class ManagementBroker : public ManagementAgent
+class ManagementAgent
{
private:
int threadPoolSize;
public:
- ManagementBroker ();
- virtual ~ManagementBroker ();
+ typedef enum {
+ SEV_EMERG = 0,
+ SEV_ALERT = 1,
+ SEV_CRIT = 2,
+ SEV_ERROR = 3,
+ SEV_WARN = 4,
+ SEV_NOTE = 5,
+ SEV_INFO = 6,
+ SEV_DEBUG = 7,
+ SEV_DEFAULT = 8
+ } severity_t;
+
+ ManagementAgent ();
+ virtual ~ManagementAgent ();
void configure (const std::string& dataDir, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
@@ -55,41 +68,34 @@ public:
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
- void registerClass (const std::string& packageName,
- const std::string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent (const std::string& packageName,
- const std::string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0);
- void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT);
- void clientAdded (const std::string& routingKey);
+ QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
+ const std::string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
+ uint64_t persistId = 0);
+ QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
+ severity_t severity = SEV_DEFAULT);
+ QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
+
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
- const framing::Uuid& getUuid() const { return uuid; }
- // Stubs for remote management agent calls
- void init(const std::string&, uint16_t, uint16_t, bool,
- const std::string&, const std::string&, const std::string&,
- const std::string&, const std::string&) { assert(0); }
- void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); }
- uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
- int getSignalFd () { assert(0); return -1; }
+ const framing::Uuid& getUuid() const { return uuid; }
void setAllocator(std::auto_ptr<IdAllocator> allocator);
uint64_t allocateId(Manageable* object);
private:
- friend class ManagementAgent;
-
struct Periodic : public qpid::broker::TimerTask
{
- ManagementBroker& broker;
+ ManagementAgent& agent;
- Periodic (ManagementBroker& broker, uint32_t seconds);
+ Periodic (ManagementAgent& agent, uint32_t seconds);
virtual ~Periodic ();
void fire ();
};
@@ -239,4 +245,4 @@ private:
}}
-#endif /*!_ManagementBroker_*/
+#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
index 4dcafbfcdd..0793b2d18c 100644
--- a/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -27,14 +27,14 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) :
- Exchange (_name, _parent), TopicExchange(_name, _parent) {}
+ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) :
+ Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
ManagementExchange::ManagementExchange (const std::string& _name,
bool _durable,
const FieldTable& _args,
- Manageable* _parent) :
- Exchange (_name, _durable, _args, _parent),
- TopicExchange(_name, _durable, _args, _parent) {}
+ Manageable* _parent, Broker* b) :
+ Exchange (_name, _durable, _args, _parent, b),
+ TopicExchange(_name, _durable, _args, _parent, b) {}
void ManagementExchange::route (Deliverable& msg,
const string& routingKey,
@@ -60,7 +60,7 @@ bool ManagementExchange::bind (Queue::shared_ptr queue,
return TopicExchange::bind(queue, routingKey, args);
}
-void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
+void ManagementExchange::setManagmentAgent (ManagementAgent* agent)
{
managementAgent = agent;
}
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index d54db1a74e..5e51683515 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -22,7 +22,7 @@
#define _ManagementExchange_
#include "qpid/broker/TopicExchange.h"
-#include "ManagementBroker.h"
+#include "ManagementAgent.h"
namespace qpid {
namespace broker {
@@ -30,15 +30,15 @@ namespace broker {
class ManagementExchange : public virtual TopicExchange
{
private:
- management::ManagementBroker* managementAgent;
+ management::ManagementAgent* managementAgent;
public:
static const std::string typeName;
- ManagementExchange (const string& name, Manageable* _parent = 0);
+ ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0);
ManagementExchange (const string& _name, bool _durable,
const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0);
+ Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
@@ -50,7 +50,7 @@ class ManagementExchange : public virtual TopicExchange
const string& routingKey,
const qpid::framing::FieldTable* args);
- void setManagmentAgent (management::ManagementBroker* agent);
+ void setManagmentAgent (management::ManagementAgent* agent);
virtual ~ManagementExchange();
};
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index f4c45de126..08008b3d79 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -21,7 +21,6 @@
#include "Manageable.h"
#include "ManagementObject.h"
-#include "qpid/agent/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Thread.h"
@@ -156,6 +155,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i)
}}
+int ManagementObject::maxThreads = 1;
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (framing::Buffer& buf)
@@ -176,7 +176,7 @@ int ManagementObject::getThreadIndex() {
if (thisIndex == -1) {
sys::Mutex::ScopedLock mutex(accessLock);
thisIndex = nextThreadIndex;
- if (nextThreadIndex < agent->getMaxThreads() - 1)
+ if (nextThreadIndex < maxThreads - 1)
nextThreadIndex++;
}
return thisIndex;
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index 498169318d..15c2307886 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -32,7 +32,6 @@ namespace qpid {
namespace management {
class Manageable;
-class ManagementAgent;
class ObjectId;
@@ -111,7 +110,7 @@ public:
class ManagementObject : public ManagementItem
{
- protected:
+protected:
uint64_t createTime;
uint64_t destroyTime;
@@ -122,8 +121,6 @@ class ManagementObject : public ManagementItem
bool deleted;
Manageable* coreObject;
sys::Mutex accessLock;
- ManagementAgent* agent;
- int maxThreads;
uint32_t flags;
static int nextThreadIndex;
@@ -133,13 +130,14 @@ class ManagementObject : public ManagementItem
QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf);
public:
+ static int maxThreads;
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
- ManagementObject(ManagementAgent* _agent, Manageable* _core) :
+ ManagementObject(Manageable* _core) :
createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))),
destroyTime(0), updateTime(createTime), configChanged(true),
instChanged(true), deleted(false),
- coreObject(_core), agent(_agent), forcePublish(false) {}
+ coreObject(_core), forcePublish(false) {}
virtual ~ManagementObject() {}
virtual writeSchemaCall_t getWriteSchemaCall() = 0;