summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/agent
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
committerTed Ross <tross@apache.org>2008-10-07 21:47:35 +0000
commit9d199b74aee76859480a7ee92d95c6db42028b43 (patch)
treeca09aace4aaac2afa9650cc78833d30b056313a9 /cpp/src/qpid/agent
parent41d33af55b9fbf4c664ccb56accb1a37bd1ef006 (diff)
downloadqpid-python-9d199b74aee76859480a7ee92d95c6db42028b43.tar.gz
QPID-1327 - Event support for Management
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@702651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
-rw-r--r--cpp/src/qpid/agent/ManagementAgent.h37
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp186
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h70
3 files changed, 164 insertions, 129 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h
index 1c219f7463..6af9abc26b 100644
--- a/cpp/src/qpid/agent/ManagementAgent.h
+++ b/cpp/src/qpid/agent/ManagementAgent.h
@@ -21,6 +21,7 @@
//
#include "qpid/management/ManagementObject.h"
+#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/Mutex.h"
@@ -43,8 +44,8 @@ class ManagementAgent
static ManagementAgent* agent;
};
- ManagementAgent () {}
- virtual ~ManagementAgent () {}
+ ManagementAgent() {}
+ virtual ~ManagementAgent() {}
virtual int getMaxThreads() = 0;
@@ -78,10 +79,16 @@ class ManagementAgent
// package initializer generated by the management code generator.
//
virtual void
- RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall) = 0;
+ registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall) = 0;
+
+ virtual void
+ registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
+ management::ManagementEvent::writeSchemaCall_t schemaCall) = 0;
// Add a management object to the agent. Once added, this object shall be visible
// in the greater management context.
@@ -97,8 +104,11 @@ class ManagementAgent
// pointer. This allows the management agent to report the deletion of the object
// in an orderly way.
//
- virtual ObjectId addObject (ManagementObject* objectPtr,
- uint64_t persistId = 0) = 0;
+ virtual ObjectId addObject(ManagementObject* objectPtr, uint64_t persistId = 0) = 0;
+
+ //
+ //
+ virtual void raiseEvent(const ManagementEvent& event) = 0;
// If "useExternalThread" was set to true in init, this method must
// be called to provide a thread for any pending method calls that have arrived.
@@ -113,7 +123,7 @@ class ManagementAgent
// to pollCallbacks are necessary to clear the backlog. If callLimit is zero,
// the return value will also be zero.
//
- virtual uint32_t pollCallbacks (uint32_t callLimit = 0) = 0;
+ virtual uint32_t pollCallbacks(uint32_t callLimit = 0) = 0;
// If "useExternalThread" was set to true in the constructor, this method provides
// a standard file descriptor that can be used in a select statement to signal that
@@ -121,14 +131,7 @@ class ManagementAgent
// least one method call). When this fd is ready-for-read, pollCallbacks may be
// invoked. Calling pollCallbacks shall reset the ready-to-read state of the fd.
//
- virtual int getSignalFd (void) = 0;
-
-protected:
- friend class ManagementObject;
- virtual sys::Mutex& getMutex() = 0;
- virtual framing::Buffer* startEventLH() = 0;
- virtual void finishEventLH(framing::Buffer* buf) = 0;
-
+ virtual int getSignalFd() = 0;
};
}}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 6a1542b1f2..b178d0fc28 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -75,12 +75,13 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
return agent;
}
-const string ManagementAgentImpl::storeMagicNumber("MA01");
+const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
extThread(false), writeFd(-1), readFd(-1),
- clientWasAdded(true), requestedBank(0),
- assignedBank(0), brokerBank(0), bootSequence(0),
+ connected(false), lastFailure("never connected"),
+ clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
+ assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
pubThreadBody(*this), pubThread(pubThreadBody)
{
@@ -122,18 +123,24 @@ void ManagementAgentImpl::init(string brokerHost,
storeData(true);
}
-ManagementAgentImpl::~ManagementAgentImpl()
-{
+void ManagementAgentImpl::registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementAgentImpl::RegisterClass(std::string packageName,
- std::string className,
- uint8_t* md5Sum,
+void ManagementAgentImpl::registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = FindOrAddPackage(packageName);
- AddClassLocal(pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = findOrAddPackage(packageName);
+ addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
@@ -151,6 +158,23 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
return objectId;
}
+void ManagementAgentImpl::raiseEvent(const ManagementEvent& event)
+{
+ Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.event");
+}
+
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
Mutex::ScopedLock lock(agentLock);
@@ -184,10 +208,12 @@ void ManagementAgentImpl::startProtocol()
char rawbuffer[512];
Buffer buffer(rawbuffer, 512);
- EncodeHeader(buffer, 'A');
+ connected = true;
+ encodeHeader(buffer, 'A');
buffer.putShortString("RemoteAgent [C++]");
systemId.encode (buffer);
- buffer.putLong(requestedBank);
+ buffer.putLong(requestedBrokerBank);
+ buffer.putLong(requestedAgentBank);
uint32_t length = 512 - buffer.available();
buffer.reset();
connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
@@ -197,10 +223,12 @@ void ManagementAgentImpl::storeData(bool requested)
{
if (!storeFile.empty()) {
ofstream outFile(storeFile.c_str());
- uint32_t bankToWrite = requested ? requestedBank : assignedBank;
+ uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank;
+ uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank;
if (outFile.good()) {
- outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl;
+ outFile << storeMagicNumber << " " << brokerBankToWrite << " " <<
+ agentBankToWrite << " " << bootSequence << endl;
outFile.close();
}
}
@@ -215,7 +243,8 @@ void ManagementAgentImpl::retrieveData()
if (inFile.good()) {
inFile >> mn;
if (mn == storeMagicNumber) {
- inFile >> requestedBank;
+ inFile >> requestedBrokerBank;
+ inFile >> requestedAgentBank;
inFile >> bootSequence;
}
inFile.close();
@@ -229,7 +258,7 @@ void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequen
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'z', sequence);
+ encodeHeader(outBuffer, 'z', sequence);
outBuffer.putLong(code);
outBuffer.putShortString(text);
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -241,20 +270,23 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
- brokerBank = inBuffer.getLong();
- assignedBank = inBuffer.getLong();
- if (assignedBank != requestedBank) {
- if (requestedBank == 0)
- cout << "Initial object-id bank assigned: " << assignedBank << endl;
+ assignedBrokerBank = inBuffer.getLong();
+ assignedAgentBank = inBuffer.getLong();
+ if ((assignedBrokerBank != requestedBrokerBank) ||
+ (assignedAgentBank != requestedAgentBank)) {
+ if (requestedAgentBank == 0)
+ cout << "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
+ assignedAgentBank << endl;
else
- cout << "Collision in object-id! New bank assigned: " << assignedBank << endl;
+ cout << "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
+ "." << assignedAgentBank << endl;
storeData();
}
- attachment.setBanks(brokerBank, assignedBank);
+ attachment.setBanks(assignedBrokerBank, assignedAgentBank);
// Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBank);
+ connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
@@ -263,8 +295,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'p');
- EncodePackageIndication(outBuffer, pIter);
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -273,8 +305,8 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
ClassMap cMap = pIter->second;
for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
outBuffer.reset();
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -294,14 +326,14 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
- ClassMap cMap = pIter->second;
+ ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- SchemaClass schema = cIter->second;
+ SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 's', sequence);
+ encodeHeader(outBuffer, 's', sequence);
schema.writeSchemaCall(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
@@ -331,7 +363,7 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
@@ -344,7 +376,14 @@ void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequenc
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
}
else
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(std::exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putShortString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -379,7 +418,7 @@ void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, st
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'g', sequence);
+ encodeHeader(outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -419,7 +458,7 @@ void ManagementAgentImpl::received(Message& msg)
replyToKey = rt.getRoutingKey();
}
- if (CheckHeader(inBuffer, &opcode, &sequence))
+ if (checkHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
@@ -429,16 +468,16 @@ void ManagementAgentImpl::received(Message& msg)
}
}
-void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet('A');
buf.putOctet('M');
- buf.putOctet('1');
+ buf.putOctet('2');
buf.putOctet(opcode);
buf.putLong (seq);
}
-bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
return false;
@@ -450,10 +489,10 @@ bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *se
*opcode = buf.getOctet();
*seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name)
{
PackageMap::iterator pIter = packages.find(name);
if (pIter != packages.end())
@@ -467,8 +506,8 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'p');
- EncodePackageIndication(outBuffer, result.first);
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package");
@@ -486,8 +525,9 @@ void ManagementAgentImpl::moveNewObjectsLH()
newManagementObjects.clear();
}
-void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
- string className,
+void ManagementAgentImpl::addClassLocal(uint8_t classKind,
+ PackageMap::iterator pIter,
+ const string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -502,32 +542,28 @@ void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
return;
// No such class found, create a new class with local information.
- SchemaClass classInfo;
-
- classInfo.writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
-
- // TODO: Publish a class-indication message
+ cMap.insert(std::pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
}
-void ManagementAgentImpl::EncodePackageIndication(Buffer& buf,
+void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
PackageMap::iterator pIter)
{
buf.putShortString((*pIter).first);
}
-void ManagementAgentImpl::EncodeClassIndication(Buffer& buf,
+void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
+ buf.putOctet((*cIter).second.kind);
buf.putShortString((*pIter).first);
buf.putShortString(key.name);
- buf.putBin128 (key.hash);
+ buf.putBin128(key.hash);
}
-void ManagementAgentImpl::PeriodicProcessing()
+void ManagementAgentImpl::periodicProcessing()
{
#define BUFSIZE 65536
Mutex::ScopedLock lock(agentLock);
@@ -536,9 +572,12 @@ void ManagementAgentImpl::PeriodicProcessing()
string routingKey;
std::list<ObjectId> deleteList;
+ if (!connected)
+ return;
+
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
+ encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
contentSize = BUFSIZE - msgBuffer.available();
@@ -573,7 +612,7 @@ void ManagementAgentImpl::PeriodicProcessing()
if (object->getConfigChanged() || object->isDeleted())
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'c');
+ encodeHeader(msgBuffer, 'c');
object->writeProperties(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available();
@@ -585,7 +624,7 @@ void ManagementAgentImpl::PeriodicProcessing()
if (object->getInstChanged())
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'i');
+ encodeHeader(msgBuffer, 'i');
object->writeStatistics(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available();
@@ -664,8 +703,8 @@ ManagementAgentImpl::ConnectionThread::~ConnectionThread()
void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
uint32_t length,
- string exchange,
- string routingKey)
+ const string& exchange,
+ const string& routingKey)
{
{
Mutex::ScopedLock _lock(connLock);
@@ -683,10 +722,10 @@ void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
session.messageTransfer(arg::content=msg, arg::destination=exchange);
}
-void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
{
stringstream key;
- key << "agent." << agentBank;
+ key << "agent." << brokerBank << "." << agentBank;
session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
arg::bindingKey=key.str());
}
@@ -695,28 +734,7 @@ void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
void ManagementAgentImpl::PublishThread::run()
{
while (true) {
- ::sleep(5);
- agent.PeriodicProcessing();
+ ::sleep(agent.getInterval());
+ agent.periodicProcessing();
}
}
-
-Mutex& ManagementAgentImpl::getMutex()
-{
- return agentLock;
-}
-
-Buffer* ManagementAgentImpl::startEventLH()
-{
- Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
- EncodeHeader(*outBuffer, 'e');
- outBuffer->putLongLong(uint64_t(Duration(now())));
- return outBuffer;
-}
-
-void ManagementAgentImpl::finishEventLH(Buffer* outBuffer)
-{
- uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
- outBuffer->reset();
- connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event");
- delete outBuffer;
-}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 7d9be6daf9..a964694690 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -43,24 +43,34 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
public:
ManagementAgentImpl();
- virtual ~ManagementAgentImpl();
+ virtual ~ManagementAgentImpl() {};
+ //
+ // Methods from ManagementAgent
+ //
int getMaxThreads() { return 1; }
void init(std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
std::string storeFile = "");
- void RegisterClass(std::string packageName,
- std::string className,
- uint8_t* md5Sum,
+ bool isConnected() { return connected; }
+ std::string& getLastFailure() { return lastFailure; }
+ void registerClass(std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject (management::ManagementObject* objectPtr,
- uint64_t persistId = 0);
- uint32_t pollCallbacks (uint32_t callLimit = 0);
- int getSignalFd (void);
+ void registerEvent(std::string& packageName,
+ std::string& eventName,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+ void raiseEvent(const management::ManagementEvent& event);
+ uint32_t pollCallbacks(uint32_t callLimit = 0);
+ int getSignalFd();
- void PeriodicProcessing();
+ uint16_t getInterval() { return interval; }
+ void periodicProcessing();
private:
@@ -84,8 +94,10 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
struct SchemaClass {
management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+ uint8_t kind;
- SchemaClass () : writeSchemaCall(0) {}
+ SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
+ const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
};
struct QueuedMethod {
@@ -120,12 +132,16 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
framing::Uuid systemId;
std::string host;
uint16_t port;
+ bool connected;
+ std::string lastFailure;
+
+ bool clientWasAdded;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ uint16_t bootSequence;
- bool clientWasAdded;
- uint32_t requestedBank;
- uint32_t assignedBank;
- uint32_t brokerBank;
- uint16_t bootSequence;
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
char eventBuffer[MA_BUFFER_SIZE];
@@ -148,9 +164,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
~ConnectionThread();
void sendBuffer(qpid::framing::Buffer& buf,
uint32_t length,
- std::string exchange,
- std::string routingKey);
- void bindToBank(uint32_t agentBank);
+ const std::string& exchange,
+ const std::string& routingKey);
+ void bindToBank(uint32_t brokerBank, uint32_t agentBank);
};
class PublishThread : public sys::Runnable
@@ -171,19 +187,20 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void startProtocol();
void storeData(bool requested=false);
void retrieveData();
- PackageMap::iterator FindOrAddPackage (std::string name);
+ PackageMap::iterator findOrAddPackage(const std::string& name);
void moveNewObjectsLH();
- void AddClassLocal (PackageMap::iterator pIter,
- std::string className,
+ void addClassLocal (uint8_t classKind,
+ PackageMap::iterator pIter,
+ const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (qpid::framing::Buffer& buf,
+ void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
- void EncodeClassIndication (qpid::framing::Buffer& buf,
+ void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
- void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendCommandComplete (std::string replyToKey, uint32_t sequence,
uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
@@ -194,9 +211,6 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* outBuffer);
};
}}