diff options
| author | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-10-07 21:47:35 +0000 |
| commit | 9d199b74aee76859480a7ee92d95c6db42028b43 (patch) | |
| tree | ca09aace4aaac2afa9650cc78833d30b056313a9 /cpp/src/qpid/agent | |
| parent | 41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff) | |
| download | qpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz | |
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 186 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 70 |
3 files changed, 164 insertions, 129 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index 1c219f7463..6af9abc26b 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -21,6 +21,7 @@ // #include "qpid/management/ManagementObject.h" +#include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Mutex.h" @@ -43,8 +44,8 @@ class ManagementAgent static ManagementAgent* agent; }; - ManagementAgent () {} - virtual ~ManagementAgent () {} + ManagementAgent() {} + virtual ~ManagementAgent() {} virtual int getMaxThreads() = 0; @@ -78,10 +79,16 @@ class ManagementAgent // package initializer generated by the management code generator. // virtual void - RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) = 0; + registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) = 0; + + virtual void + registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, + management::ManagementEvent::writeSchemaCall_t schemaCall) = 0; // Add a management object to the agent. Once added, this object shall be visible // in the greater management context. @@ -97,8 +104,11 @@ class ManagementAgent // pointer. This allows the management agent to report the deletion of the object // in an orderly way. // - virtual ObjectId addObject (ManagementObject* objectPtr, - uint64_t persistId = 0) = 0; + virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0; + + // + // + virtual void raiseEvent(const ManagementEvent& event) = 0; // If "useExternalThread" was set to true in init, this method must // be called to provide a thread for any pending method calls that have arrived. @@ -113,7 +123,7 @@ class ManagementAgent // to pollCallbacks are necessary to clear the backlog. If callLimit is zero, // the return value will also be zero. // - virtual uint32_t pollCallbacks (uint32_t callLimit = 0) = 0; + virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0; // If "useExternalThread" was set to true in the constructor, this method provides // a standard file descriptor that can be used in a select statement to signal that @@ -121,14 +131,7 @@ class ManagementAgent // least one method call). When this fd is ready-for-read, pollCallbacks may be // invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd. // - virtual int getSignalFd (void) = 0; - -protected: - friend class ManagementObject; - virtual sys::Mutex& getMutex() = 0; - virtual framing::Buffer* startEventLH() = 0; - virtual void finishEventLH(framing::Buffer* buf) = 0; - + virtual int getSignalFd() = 0; }; }} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 6a1542b1f2..b178d0fc28 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -75,12 +75,13 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } -const string ManagementAgentImpl::storeMagicNumber("MA01"); +const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : extThread(false), writeFd(-1), readFd(-1), - clientWasAdded(true), requestedBank(0), - assignedBank(0), brokerBank(0), bootSequence(0), + connected(false), lastFailure("never connected"), + clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), pubThreadBody(*this), pubThread(pubThreadBody) { @@ -122,18 +123,24 @@ void ManagementAgentImpl::init(string brokerHost, storeData(true); } -ManagementAgentImpl::~ManagementAgentImpl() -{ +void ManagementAgentImpl::registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } -void ManagementAgentImpl::RegisterClass(std::string packageName, - std::string className, - uint8_t* md5Sum, +void ManagementAgentImpl::registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage(packageName); - AddClassLocal(pIter, className, md5Sum, schemaCall); + PackageMap::iterator pIter = findOrAddPackage(packageName); + addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } ObjectId ManagementAgentImpl::addObject(ManagementObject* object, @@ -151,6 +158,23 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object, return objectId; } +void ManagementAgentImpl::raiseEvent(const ManagementEvent& event) +{ + Mutex::ScopedLock lock(agentLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.event"); +} + uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) { Mutex::ScopedLock lock(agentLock); @@ -184,10 +208,12 @@ void ManagementAgentImpl::startProtocol() char rawbuffer[512]; Buffer buffer(rawbuffer, 512); - EncodeHeader(buffer, 'A'); + connected = true; + encodeHeader(buffer, 'A'); buffer.putShortString("RemoteAgent [C++]"); systemId.encode (buffer); - buffer.putLong(requestedBank); + buffer.putLong(requestedBrokerBank); + buffer.putLong(requestedAgentBank); uint32_t length = 512 - buffer.available(); buffer.reset(); connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); @@ -197,10 +223,12 @@ void ManagementAgentImpl::storeData(bool requested) { if (!storeFile.empty()) { ofstream outFile(storeFile.c_str()); - uint32_t bankToWrite = requested ? requestedBank : assignedBank; + uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank; + uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank; if (outFile.good()) { - outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl; + outFile << storeMagicNumber << " " << brokerBankToWrite << " " << + agentBankToWrite << " " << bootSequence << endl; outFile.close(); } } @@ -215,7 +243,8 @@ void ManagementAgentImpl::retrieveData() if (inFile.good()) { inFile >> mn; if (mn == storeMagicNumber) { - inFile >> requestedBank; + inFile >> requestedBrokerBank; + inFile >> requestedAgentBank; inFile >> bootSequence; } inFile.close(); @@ -229,7 +258,7 @@ void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequen Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'z', sequence); + encodeHeader(outBuffer, 'z', sequence); outBuffer.putLong(code); outBuffer.putShortString(text); outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -241,20 +270,23 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - brokerBank = inBuffer.getLong(); - assignedBank = inBuffer.getLong(); - if (assignedBank != requestedBank) { - if (requestedBank == 0) - cout << "Initial object-id bank assigned: " << assignedBank << endl; + assignedBrokerBank = inBuffer.getLong(); + assignedAgentBank = inBuffer.getLong(); + if ((assignedBrokerBank != requestedBrokerBank) || + (assignedAgentBank != requestedAgentBank)) { + if (requestedAgentBank == 0) + cout << "Initial object-id bank assigned: " << assignedBrokerBank << "." << + assignedAgentBank << endl; else - cout << "Collision in object-id! New bank assigned: " << assignedBank << endl; + cout << "Collision in object-id! New bank assigned: " << assignedBrokerBank << + "." << assignedAgentBank << endl; storeData(); } - attachment.setBanks(brokerBank, assignedBank); + attachment.setBanks(assignedBrokerBank, assignedAgentBank); // Bind to qpid.management to receive commands - connThreadBody.bindToBank(assignedBank); + connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -263,8 +295,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'p'); - EncodePackageIndication(outBuffer, pIter); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -273,8 +305,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) ClassMap cMap = pIter->second; for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) { outBuffer.reset(); - EncodeHeader(outBuffer, 'q'); - EncodeClassIndication(outBuffer, pIter, cIter); + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); @@ -294,14 +326,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { - ClassMap cMap = pIter->second; + ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - SchemaClass schema = cIter->second; + SchemaClass& schema = cIter->second; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 's', sequence); + encodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); @@ -331,7 +363,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc inBuffer.getBin128(hash); inBuffer.getShortString(methodName); - EncodeHeader(outBuffer, 'm', sequence); + encodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { @@ -344,7 +376,14 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); } else - iter->second->doMethod(methodName, inBuffer, outBuffer); + try { + outBuffer.record(); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(std::exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putShortString(e.what()); + } } outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -379,7 +418,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'g', sequence); + encodeHeader(outBuffer, 'g', sequence); object->writeProperties(outBuffer); object->writeStatistics(outBuffer, true); outLen = MA_BUFFER_SIZE - outBuffer.available(); @@ -419,7 +458,7 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader(inBuffer, &opcode, &sequence)) + if (checkHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); @@ -429,16 +468,16 @@ void ManagementAgentImpl::received(Message& msg) } } -void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { buf.putOctet('A'); buf.putOctet('M'); - buf.putOctet('1'); + buf.putOctet('2'); buf.putOctet(opcode); buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; @@ -450,10 +489,10 @@ bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *se *opcode = buf.getOctet(); *seq = buf.getLong(); - return h1 == 'A' && h2 == 'M' && h3 == '1'; + return h1 == 'A' && h2 == 'M' && h3 == '2'; } -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name) { PackageMap::iterator pIter = packages.find(name); if (pIter != packages.end()) @@ -467,8 +506,8 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage( Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader(outBuffer, 'p'); - EncodePackageIndication(outBuffer, result.first); + encodeHeader(outBuffer, 'p'); + encodePackageIndication(outBuffer, result.first); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package"); @@ -486,8 +525,9 @@ void ManagementAgentImpl::moveNewObjectsLH() newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, - string className, +void ManagementAgentImpl::addClassLocal(uint8_t classKind, + PackageMap::iterator pIter, + const string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall) { @@ -502,32 +542,28 @@ void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, return; // No such class found, create a new class with local information. - SchemaClass classInfo; - - classInfo.writeSchemaCall = schemaCall; - cMap[key] = classInfo; - - // TODO: Publish a class-indication message + cMap.insert(std::pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); } -void ManagementAgentImpl::EncodePackageIndication(Buffer& buf, +void ManagementAgentImpl::encodePackageIndication(Buffer& buf, PackageMap::iterator pIter) { buf.putShortString((*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication(Buffer& buf, +void ManagementAgentImpl::encodeClassIndication(Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; + buf.putOctet((*cIter).second.kind); buf.putShortString((*pIter).first); buf.putShortString(key.name); - buf.putBin128 (key.hash); + buf.putBin128(key.hash); } -void ManagementAgentImpl::PeriodicProcessing() +void ManagementAgentImpl::periodicProcessing() { #define BUFSIZE 65536 Mutex::ScopedLock lock(agentLock); @@ -536,9 +572,12 @@ void ManagementAgentImpl::PeriodicProcessing() string routingKey; std::list<ObjectId> deleteList; + if (!connected) + return; + { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'h'); + encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); contentSize = BUFSIZE - msgBuffer.available(); @@ -573,7 +612,7 @@ void ManagementAgentImpl::PeriodicProcessing() if (object->getConfigChanged() || object->isDeleted()) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'c'); + encodeHeader(msgBuffer, 'c'); object->writeProperties(msgBuffer); contentSize = BUFSIZE - msgBuffer.available(); @@ -585,7 +624,7 @@ void ManagementAgentImpl::PeriodicProcessing() if (object->getInstChanged()) { Buffer msgBuffer(msgChars, BUFSIZE); - EncodeHeader(msgBuffer, 'i'); + encodeHeader(msgBuffer, 'i'); object->writeStatistics(msgBuffer); contentSize = BUFSIZE - msgBuffer.available(); @@ -664,8 +703,8 @@ ManagementAgentImpl::ConnectionThread::~ConnectionThread() void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, uint32_t length, - string exchange, - string routingKey) + const string& exchange, + const string& routingKey) { { Mutex::ScopedLock _lock(connLock); @@ -683,10 +722,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, session.messageTransfer(arg::content=msg, arg::destination=exchange); } -void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank) { stringstream key; - key << "agent." << agentBank; + key << "agent." << brokerBank << "." << agentBank; session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), arg::bindingKey=key.str()); } @@ -695,28 +734,7 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) void ManagementAgentImpl::PublishThread::run() { while (true) { - ::sleep(5); - agent.PeriodicProcessing(); + ::sleep(agent.getInterval()); + agent.periodicProcessing(); } } - -Mutex& ManagementAgentImpl::getMutex() -{ - return agentLock; -} - -Buffer* ManagementAgentImpl::startEventLH() -{ - Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); - EncodeHeader(*outBuffer, 'e'); - outBuffer->putLongLong(uint64_t(Duration(now()))); - return outBuffer; -} - -void ManagementAgentImpl::finishEventLH(Buffer* outBuffer) -{ - uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); - outBuffer->reset(); - connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event"); - delete outBuffer; -} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 7d9be6daf9..a964694690 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -43,24 +43,34 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen public: ManagementAgentImpl(); - virtual ~ManagementAgentImpl(); + virtual ~ManagementAgentImpl() {}; + // + // Methods from ManagementAgent + // int getMaxThreads() { return 1; } void init(std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, bool useExternalThread = false, std::string storeFile = ""); - void RegisterClass(std::string packageName, - std::string className, - uint8_t* md5Sum, + bool isConnected() { return connected; } + std::string& getLastFailure() { return lastFailure; } + void registerClass(std::string& packageName, + std::string& className, + uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (management::ManagementObject* objectPtr, - uint64_t persistId = 0); - uint32_t pollCallbacks (uint32_t callLimit = 0); - int getSignalFd (void); + void registerEvent(std::string& packageName, + std::string& eventName, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + void raiseEvent(const management::ManagementEvent& event); + uint32_t pollCallbacks(uint32_t callLimit = 0); + int getSignalFd(); - void PeriodicProcessing(); + uint16_t getInterval() { return interval; } + void periodicProcessing(); private: @@ -84,8 +94,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen struct SchemaClass { management::ManagementObject::writeSchemaCall_t writeSchemaCall; + uint8_t kind; - SchemaClass () : writeSchemaCall(0) {} + SchemaClass(const management::ManagementObject::writeSchemaCall_t call, + const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {} }; struct QueuedMethod { @@ -120,12 +132,16 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen framing::Uuid systemId; std::string host; uint16_t port; + bool connected; + std::string lastFailure; + + bool clientWasAdded; + uint32_t requestedBrokerBank; + uint32_t requestedAgentBank; + uint32_t assignedBrokerBank; + uint32_t assignedAgentBank; + uint16_t bootSequence; - bool clientWasAdded; - uint32_t requestedBank; - uint32_t assignedBank; - uint32_t brokerBank; - uint16_t bootSequence; # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; char eventBuffer[MA_BUFFER_SIZE]; @@ -148,9 +164,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen ~ConnectionThread(); void sendBuffer(qpid::framing::Buffer& buf, uint32_t length, - std::string exchange, - std::string routingKey); - void bindToBank(uint32_t agentBank); + const std::string& exchange, + const std::string& routingKey); + void bindToBank(uint32_t brokerBank, uint32_t agentBank); }; class PublishThread : public sys::Runnable @@ -171,19 +187,20 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void startProtocol(); void storeData(bool requested=false); void retrieveData(); - PackageMap::iterator FindOrAddPackage (std::string name); + PackageMap::iterator findOrAddPackage(const std::string& name); void moveNewObjectsLH(); - void AddClassLocal (PackageMap::iterator pIter, - std::string className, + void addClassLocal (uint8_t classKind, + PackageMap::iterator pIter, + const std::string& className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - void EncodePackageIndication (qpid::framing::Buffer& buf, + void encodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); - void EncodeClassIndication (qpid::framing::Buffer& buf, + void encodeClassIndication (framing::Buffer& buf, PackageMap::iterator pIter, ClassMap::iterator cIter); - void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendCommandComplete (std::string replyToKey, uint32_t sequence, uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); @@ -194,9 +211,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); - sys::Mutex& getMutex(); - framing::Buffer* startEventLH(); - void finishEventLH(framing::Buffer* outBuffer); }; }} |
