diff options
| author | Ted Ross <tross@apache.org> | 2009-05-11 14:16:52 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-05-11 14:16:52 +0000 |
| commit | a1b440e5393206ec5833e2d6c2617c2aca71701f (patch) | |
| tree | edbe7aad7a01122986380860c4cedd95086a282a /cpp/src/qpid/management | |
| parent | ec0e348d1d14679f72ce704555dd2605880bddfa (diff) | |
| download | qpid-python-a1b440e5393206ec5833e2d6c2617c2aca71701f.tar.gz | |
QPID-1843 - Cleaned up the interface to the broker's internal management agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@773570 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp (renamed from cpp/src/qpid/management/ManagementBroker.cpp) | 152 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h (renamed from cpp/src/qpid/management/ManagementBroker.h) | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 10 |
6 files changed, 115 insertions, 141 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 19300ef1af..77277070d9 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -19,7 +19,8 @@ * */ -#include "ManagementBroker.h" +#include "ManagementAgent.h" +#include "ManagementObject.h" #include "IdAllocator.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" @@ -41,45 +42,13 @@ using namespace qpid::sys; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -Mutex ManagementAgent::Singleton::lock; -bool ManagementAgent::Singleton::disabled = false; -ManagementAgent* ManagementAgent::Singleton::agent = 0; -int ManagementAgent::Singleton::refCount = 0; - -ManagementAgent::Singleton::Singleton(bool disableManagement) -{ - Mutex::ScopedLock _lock(lock); - if (disableManagement && !disabled) { - disabled = true; - assert(refCount == 0); // can't disable after agent has been allocated - } - if (refCount == 0 && !disabled) - agent = new ManagementBroker(); - refCount++; -} - -ManagementAgent::Singleton::~Singleton() -{ - Mutex::ScopedLock _lock(lock); - refCount--; - if (refCount == 0 && !disabled) { - delete agent; - agent = 0; - } -} - -ManagementAgent* ManagementAgent::Singleton::getInstance() -{ - return agent; -} - -ManagementBroker::RemoteAgent::~RemoteAgent () +ManagementAgent::RemoteAgent::~RemoteAgent () { if (mgmtObject != 0) mgmtObject->resourceDestroy(); } -ManagementBroker::ManagementBroker () : +ManagementAgent::ManagementAgent () : threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) { nextObjectId = 1; @@ -90,7 +59,7 @@ ManagementBroker::ManagementBroker () : clientWasAdded = false; } -ManagementBroker::~ManagementBroker () +ManagementAgent::~ManagementAgent () { timer.stop(); { @@ -114,20 +83,21 @@ ManagementBroker::~ManagementBroker () } } -void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, +void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; interval = _interval; broker = _broker; threadPoolSize = _threads; + ManagementObject::maxThreads = threadPoolSize; timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); // Get from file or generate and save to file. if (dataDir.empty()) { uuid.generate(); - QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " << uuid); } else @@ -141,7 +111,7 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, inFile >> bootSequence; inFile >> nextRemoteBank; inFile.close(); - QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. bootSequence++; @@ -152,15 +122,15 @@ void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, else { uuid.generate(); - QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); writeData(); } - QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); } } -void ManagementBroker::writeData () +void ManagementAgent::writeData () { string filename (dataDir + "/.mbrokerdata"); ofstream outFile (filename.c_str ()); @@ -172,14 +142,14 @@ void ManagementBroker::writeData () } } -void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, +void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, qpid::broker::Exchange::shared_ptr _dexchange) { mExchange = _mexchange; dExchange = _dexchange; } -void ManagementBroker::registerClass (const string& packageName, +void ManagementAgent::registerClass (const string& packageName, const string& className, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) @@ -189,7 +159,7 @@ void ManagementBroker::registerClass (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementBroker::registerEvent (const string& packageName, +void ManagementAgent::registerEvent (const string& packageName, const string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) @@ -199,7 +169,7 @@ void ManagementBroker::registerEvent (const string& packageName, addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } -ObjectId ManagementBroker::addObject (ManagementObject* object, +ObjectId ManagementAgent::addObject (ManagementObject* object, uint64_t persistId) { Mutex::ScopedLock lock (addLock); @@ -221,7 +191,7 @@ ObjectId ManagementBroker::addObject (ManagementObject* object, return objId; } -void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t severity) +void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { Mutex::ScopedLock lock (userLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); @@ -241,18 +211,18 @@ void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t sever "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); } -ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) - : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {} -ManagementBroker::Periodic::~Periodic () {} +ManagementAgent::Periodic::~Periodic () {} -void ManagementBroker::Periodic::fire () +void ManagementAgent::Periodic::fire () { - broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval))); - broker.periodicProcessing (); + agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); + agent.periodicProcessing (); } -void ManagementBroker::clientAdded (const std::string& routingKey) +void ManagementAgent::clientAdded (const std::string& routingKey) { if (routingKey.find("console") != 0) return; @@ -272,7 +242,7 @@ void ManagementBroker::clientAdded (const std::string& routingKey) } } -void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet ('A'); buf.putOctet ('M'); @@ -281,7 +251,7 @@ void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) buf.putLong (seq); } -bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) { uint8_t h1 = buf.getOctet(); uint8_t h2 = buf.getOctet(); @@ -293,7 +263,7 @@ bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -void ManagementBroker::sendBuffer(Buffer& buf, +void ManagementAgent::sendBuffer(Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, string routingKey) @@ -327,7 +297,7 @@ void ManagementBroker::sendBuffer(Buffer& buf, } catch(exception&) {} } -void ManagementBroker::moveNewObjectsLH() +void ManagementAgent::moveNewObjectsLH() { Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); @@ -337,7 +307,7 @@ void ManagementBroker::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementBroker::periodicProcessing (void) +void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 Mutex::ScopedLock lock (userLock); @@ -421,7 +391,7 @@ void ManagementBroker::periodicProcessing (void) } } -void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, uint32_t code, string text) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -435,7 +405,7 @@ void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -bool ManagementBroker::dispatchCommand (Deliverable& deliverable, +bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const string& routingKey, const FieldTable* /*args*/) { @@ -471,7 +441,7 @@ bool ManagementBroker::dispatchCommand (Deliverable& deliverable, return true; } -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, +void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string methodName; @@ -532,7 +502,7 @@ void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKe sendBuffer(outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) { Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; @@ -545,7 +515,7 @@ void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32 sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) { for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); @@ -564,7 +534,7 @@ void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_ sendCommandComplete (replyToKey, sequence); } -void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) { string packageName; @@ -572,7 +542,7 @@ void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey findOrAddPackageLH(packageName); } -void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; @@ -601,7 +571,7 @@ void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, u sendCommandComplete(replyToKey, sequence); } -void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) { string packageName; SchemaClassKey key; @@ -633,7 +603,7 @@ void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, ui } } -void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) +void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) { // If the management package is attached locally (embedded in the broker or // linked in via plug-in), call the schema handler directly. If the package @@ -645,7 +615,7 @@ void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(buffer, bufferLen); } -void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -680,7 +650,7 @@ void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey sendCommandComplete(replyToKey, sequence, 1, "Package not found"); } -void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -699,7 +669,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { size_t length = validateSchema(inBuffer, cIter->second.kind); if (length == 0) { - QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); + QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); cMap.erase(key); } else { cIter->second.buffer = (uint8_t*) malloc(length); @@ -720,7 +690,7 @@ void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyTo } } -bool ManagementBroker::bankInUse (uint32_t bank) +bool ManagementAgent::bankInUse (uint32_t bank) { for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); @@ -730,7 +700,7 @@ bool ManagementBroker::bankInUse (uint32_t bank) return false; } -uint32_t ManagementBroker::allocateNewBank () +uint32_t ManagementAgent::allocateNewBank () { while (bankInUse (nextRemoteBank)) nextRemoteBank++; @@ -740,14 +710,14 @@ uint32_t ManagementBroker::allocateNewBank () return allocated; } -uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) +uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) { if (requestedBank == 0 || bankInUse (requestedBank)) return allocateNewBank (); return requestedBank; } -void ManagementBroker::deleteOrphanedAgentsLH() +void ManagementAgent::deleteOrphanedAgentsLH() { vector<ObjectId> deleteList; @@ -776,7 +746,7 @@ void ManagementBroker::deleteOrphanedAgentsLH() deleteList.clear(); } -void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBrokerBank, requestedAgentBank; @@ -827,7 +797,7 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe sendBuffer (outBuffer, outLen, dExchange, replyToKey); } -void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; @@ -887,7 +857,7 @@ void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, ui sendCommandComplete(replyToKey, sequence); } -bool ManagementBroker::authorizeAgentMessageLH(Message& msg) +bool ManagementAgent::authorizeAgentMessageLH(Message& msg) { Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; @@ -951,7 +921,7 @@ bool ManagementBroker::authorizeAgentMessageLH(Message& msg) return true; } -void ManagementBroker::dispatchAgentCommandLH(Message& msg) +void ManagementAgent::dispatchAgentCommandLH(Message& msg) { Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); uint8_t opcode; @@ -968,7 +938,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) return; if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " << + QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.encodedSize()); return; } @@ -994,7 +964,7 @@ void ManagementBroker::dispatchAgentCommandLH(Message& msg) } } -ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(string name) +ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) { PackageMap::iterator pIter = packages.find (name); if (pIter != packages.end ()) @@ -1003,7 +973,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri // No such package found, create a new map entry. pair<PackageMap::iterator, bool> result = packages.insert(pair<string, ClassMap>(name, ClassMap())); - QPID_LOG (debug, "ManagementBroker added package " << name); + QPID_LOG (debug, "ManagementAgent added package " << name); // Publish a package-indication message Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -1018,7 +988,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri return result.first; } -void ManagementBroker::addClassLH(uint8_t kind, +void ManagementAgent::addClassLH(uint8_t kind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, @@ -1035,20 +1005,20 @@ void ManagementBroker::addClassLH(uint8_t kind, return; // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" << + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << key.name); cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); cIter = cMap.find(key); } -void ManagementBroker::encodePackageIndication(Buffer& buf, +void ManagementAgent::encodePackageIndication(Buffer& buf, PackageMap::iterator pIter) { buf.putShortString((*pIter).first); } -void ManagementBroker::encodeClassIndication(Buffer& buf, +void ManagementAgent::encodeClassIndication(Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter) { @@ -1060,7 +1030,7 @@ void ManagementBroker::encodeClassIndication(Buffer& buf, buf.putBin128(key.hash); } -size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) +size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) { if (kind == ManagementItem::CLASS_KIND_TABLE) return validateTableSchema(inBuffer); @@ -1069,7 +1039,7 @@ size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) return 0; } -size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) +size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; @@ -1115,7 +1085,7 @@ size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) return end - start; } -size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) +size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) { uint32_t start = inBuffer.getPosition(); uint32_t end; @@ -1147,13 +1117,13 @@ size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) return end - start; } -void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a) +void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { Mutex::ScopedLock lock (addLock); allocator = a; } -uint64_t ManagementBroker::allocateId(Manageable* object) +uint64_t ManagementAgent::allocateId(Manageable* object) { Mutex::ScopedLock lock (addLock); if (allocator.get()) return allocator->getIdFor(object); diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementAgent.h index a57f73be15..2411e6c277 100644 --- a/cpp/src/qpid/management/ManagementBroker.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -1,5 +1,5 @@ -#ifndef _ManagementBroker_ -#define _ManagementBroker_ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ /* * @@ -21,14 +21,15 @@ * under the License. * */ +#include "qpid/broker/BrokerImportExport.h" #include "qpid/Options.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Timer.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/ConnectionToken.h" -#include "qpid/agent/ManagementAgent.h" #include "ManagementObject.h" +#include "ManagementEvent.h" #include "Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" #include <qpid/framing/AMQFrame.h> @@ -39,15 +40,27 @@ namespace management { struct IdAllocator; -class ManagementBroker : public ManagementAgent +class ManagementAgent { private: int threadPoolSize; public: - ManagementBroker (); - virtual ~ManagementBroker (); + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + ManagementAgent (); + virtual ~ManagementAgent (); void configure (const std::string& dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); @@ -55,41 +68,34 @@ public: void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, qpid::broker::Exchange::shared_ptr directExchange); int getMaxThreads () { return threadPoolSize; } - void registerClass (const std::string& packageName, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent (const std::string& packageName, - const std::string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0); - void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); - void clientAdded (const std::string& routingKey); + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); - const framing::Uuid& getUuid() const { return uuid; } - // Stubs for remote management agent calls - void init(const std::string&, uint16_t, uint16_t, bool, - const std::string&, const std::string&, const std::string&, - const std::string&, const std::string&) { assert(0); } - void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); } - uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } - int getSignalFd () { assert(0); return -1; } + const framing::Uuid& getUuid() const { return uuid; } void setAllocator(std::auto_ptr<IdAllocator> allocator); uint64_t allocateId(Manageable* object); private: - friend class ManagementAgent; - struct Periodic : public qpid::broker::TimerTask { - ManagementBroker& broker; + ManagementAgent& agent; - Periodic (ManagementBroker& broker, uint32_t seconds); + Periodic (ManagementAgent& agent, uint32_t seconds); virtual ~Periodic (); void fire (); }; @@ -239,4 +245,4 @@ private: }} -#endif /*!_ManagementBroker_*/ +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index 4dcafbfcdd..0793b2d18c 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -27,14 +27,14 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) : - Exchange (_name, _parent), TopicExchange(_name, _parent) {} +ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} ManagementExchange::ManagementExchange (const std::string& _name, bool _durable, const FieldTable& _args, - Manageable* _parent) : - Exchange (_name, _durable, _args, _parent), - TopicExchange(_name, _durable, _args, _parent) {} + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + TopicExchange(_name, _durable, _args, _parent, b) {} void ManagementExchange::route (Deliverable& msg, const string& routingKey, @@ -60,7 +60,7 @@ bool ManagementExchange::bind (Queue::shared_ptr queue, return TopicExchange::bind(queue, routingKey, args); } -void ManagementExchange::setManagmentAgent (ManagementBroker* agent) +void ManagementExchange::setManagmentAgent (ManagementAgent* agent) { managementAgent = agent; } diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index d54db1a74e..5e51683515 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -22,7 +22,7 @@ #define _ManagementExchange_ #include "qpid/broker/TopicExchange.h" -#include "ManagementBroker.h" +#include "ManagementAgent.h" namespace qpid { namespace broker { @@ -30,15 +30,15 @@ namespace broker { class ManagementExchange : public virtual TopicExchange { private: - management::ManagementBroker* managementAgent; + management::ManagementAgent* managementAgent; public: static const std::string typeName; - ManagementExchange (const string& name, Manageable* _parent = 0); + ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0); ManagementExchange (const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - Manageable* _parent = 0); + Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } @@ -50,7 +50,7 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementBroker* agent); + void setManagmentAgent (management::ManagementAgent* agent); virtual ~ManagementExchange(); }; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index f4c45de126..08008b3d79 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,6 @@ #include "Manageable.h" #include "ManagementObject.h" -#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -156,6 +155,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) }} +int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (framing::Buffer& buf) @@ -176,7 +176,7 @@ int ManagementObject::getThreadIndex() { if (thisIndex == -1) { sys::Mutex::ScopedLock mutex(accessLock); thisIndex = nextThreadIndex; - if (nextThreadIndex < agent->getMaxThreads() - 1) + if (nextThreadIndex < maxThreads - 1) nextThreadIndex++; } return thisIndex; diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 498169318d..15c2307886 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -32,7 +32,6 @@ namespace qpid { namespace management { class Manageable; -class ManagementAgent; class ObjectId; @@ -111,7 +110,7 @@ public: class ManagementObject : public ManagementItem { - protected: +protected: uint64_t createTime; uint64_t destroyTime; @@ -122,8 +121,6 @@ class ManagementObject : public ManagementItem bool deleted; Manageable* coreObject; sys::Mutex accessLock; - ManagementAgent* agent; - int maxThreads; uint32_t flags; static int nextThreadIndex; @@ -133,13 +130,14 @@ class ManagementObject : public ManagementItem QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf); public: + static int maxThreads; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject(ManagementAgent* _agent, Manageable* _core) : + ManagementObject(Manageable* _core) : createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), destroyTime(0), updateTime(createTime), configChanged(true), instChanged(true), deleted(false), - coreObject(_core), agent(_agent), forcePublish(false) {} + coreObject(_core), forcePublish(false) {} virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall() = 0; |
