summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/agent
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-09-03 18:01:44 +0000
committerTed Ross <tross@apache.org>2008-09-03 18:01:44 +0000
commit13191e7951b29453c75db8384391d26f0402139d (patch)
treeb3ca5bd5f9c8523924cf9d1a0fe118099bef9894 /cpp/src/qpid/agent
parentb8cf7ea1034fa6216183f03022c83efdb154f3ee (diff)
downloadqpid-python-13191e7951b29453c75db8384391d26f0402139d.tar.gz
QPID-1174 Updates to the management framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@691700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
-rw-r--r--cpp/src/qpid/agent/ManagementAgent.h17
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp585
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h101
3 files changed, 488 insertions, 215 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h
index e7379e6c94..1c219f7463 100644
--- a/cpp/src/qpid/agent/ManagementAgent.h
+++ b/cpp/src/qpid/agent/ManagementAgent.h
@@ -65,10 +65,14 @@ class ManagementAgent
// agent's thread. In this case, the callback implementations
// MUST be thread safe.
//
+ // storeFile - File where this process has read and write access. This
+ // file shall be used to store persistent state.
+ //
virtual void init (std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
- bool useExternalThread = false) = 0;
+ bool useExternalThread = false,
+ std::string storeFile = "") = 0;
// Register a schema with the management agent. This is normally called by the
// package initializer generated by the management code generator.
@@ -93,9 +97,8 @@ class ManagementAgent
// pointer. This allows the management agent to report the deletion of the object
// in an orderly way.
//
- virtual uint64_t addObject (ManagementObject* objectPtr,
- uint32_t persistId = 0,
- uint32_t persistBank = 4) = 0;
+ virtual ObjectId addObject (ManagementObject* objectPtr,
+ uint64_t persistId = 0) = 0;
// If "useExternalThread" was set to true in init, this method must
// be called to provide a thread for any pending method calls that have arrived.
@@ -120,6 +123,12 @@ class ManagementAgent
//
virtual int getSignalFd (void) = 0;
+protected:
+ friend class ManagementObject;
+ virtual sys::Mutex& getMutex() = 0;
+ virtual framing::Buffer* startEventLH() = 0;
+ virtual void finishEventLH(framing::Buffer* buf) = 0;
+
};
}}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index ebdc71e3b1..c4108b0ae2 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -24,12 +24,21 @@
#include <list>
#include <unistd.h>
#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <iostream>
+#include <fstream>
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::sys;
using std::stringstream;
+using std::ofstream;
+using std::ifstream;
using std::string;
using std::cout;
using std::endl;
@@ -66,128 +75,186 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
return agent;
}
+const string ManagementAgentImpl::storeMagicNumber("MA01");
+
ManagementAgentImpl::ManagementAgentImpl() :
- clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
+ extThread(false), writeFd(-1), readFd(-1),
+ clientWasAdded(true), requestedBank(0),
+ assignedBank(0), brokerBank(0), bootSequence(0),
+ connThreadBody(*this), connThread(connThreadBody),
+ pubThreadBody(*this), pubThread(pubThreadBody)
{
// TODO: Establish system ID
}
-void ManagementAgentImpl::init(std::string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread)
+void ManagementAgentImpl::init(string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ string _storeFile)
{
- {
- Mutex::ScopedLock lock(agentLock);
- startupWait = true;
- }
-
interval = intervalSeconds;
extThread = useExternalThread;
+ storeFile = _storeFile;
nextObjectId = 1;
+ host = brokerHost;
+ port = brokerPort;
+
+ // TODO: Abstract the socket calls for portability
+ if (extThread) {
+ int pair[2];
+ int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair);
+ if (result == -1) {
+ return;
+ }
+ writeFd = pair[0];
+ readFd = pair[1];
- sessionId.generate();
- queueName << "qmfagent-" << sessionId;
- string dest = "qmfagent";
-
- connection.open(brokerHost.c_str(), brokerPort);
- session = connection.newSession (queueName.str());
- dispatcher = new client::Dispatcher(session);
+ // Set the readFd to non-blocking
+ int flags = fcntl(readFd, F_GETFL);
+ fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
+ }
+ retrieveData();
+ bootSequence++;
+ if ((bootSequence & 0xF000) != 0)
+ bootSequence = 1;
+ storeData(true);
+}
- session.queueDeclare (arg::queue=queueName.str());
- session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str());
- session.messageSubscribe (arg::queue=queueName.str(),
- arg::destination=dest);
- session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
- session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+ManagementAgentImpl::~ManagementAgentImpl()
+{
+}
- Message attachRequest;
- char rawbuffer[512];
- Buffer buffer (rawbuffer, 512);
+void ManagementAgentImpl::RegisterClass(std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(agentLock);
+ PackageMap::iterator pIter = FindOrAddPackage(packageName);
+ AddClassLocal(pIter, className, md5Sum, schemaCall);
+}
- attachRequest.getDeliveryProperties().setRoutingKey("broker");
- attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ uint64_t persistId)
+{
+ Mutex::ScopedLock lock(addLock);
+ uint16_t sequence = persistId ? 0 : bootSequence;
+ uint64_t objectNum = persistId ? persistId : nextObjectId++;
- EncodeHeader (buffer, 'A');
- buffer.putShortString ("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong (11);
+ ObjectId objectId(&attachment, 0, sequence, objectNum);
- size_t length = 512 - buffer.available ();
- string stringBuffer (rawbuffer, length);
- attachRequest.setData (stringBuffer);
+ // TODO: fix object-id handling
+ object->setObjectId(objectId);
+ newManagementObjects[objectId] = object;
+ return objectId;
+}
- session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
+{
+ Mutex::ScopedLock lock(agentLock);
- dispatcher->listen(dest, this);
- dispatcher->start();
+ for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
+ if (methodQueue.empty())
+ break;
- {
- Mutex::ScopedLock lock(agentLock);
- if (startupWait)
- startupCond.wait(agentLock);
+ QueuedMethod* item = methodQueue.front();
+ methodQueue.pop_front();
+ {
+ Mutex::ScopedUnlock unlock(agentLock);
+ Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
+ invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ delete item;
+ }
}
+
+ uint8_t rbuf[100];
+ while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes
+ return methodQueue.size();
}
-ManagementAgentImpl::~ManagementAgentImpl()
+int ManagementAgentImpl::getSignalFd(void)
{
- dispatcher->stop();
- session.close();
- delete dispatcher;
+ return readFd;
}
-void ManagementAgentImpl::RegisterClass (std::string packageName,
- std::string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
-{
- Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = FindOrAddPackage (packageName);
- AddClassLocal (pIter, className, md5Sum, schemaCall);
+void ManagementAgentImpl::startProtocol()
+{
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
+
+ EncodeHeader(buffer, 'A');
+ buffer.putShortString("RemoteAgent [C++]");
+ systemId.encode (buffer);
+ buffer.putLong(requestedBank);
+ uint32_t length = 512 - buffer.available();
+ buffer.reset();
+ connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
}
-uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
- uint32_t /*persistId*/,
- uint32_t /*persistBank*/)
+void ManagementAgentImpl::storeData(bool requested)
{
- Mutex::ScopedLock lock(addLock);
- uint64_t objectId;
+ if (!storeFile.empty()) {
+ ofstream outFile(storeFile.c_str());
+ uint32_t bankToWrite = requested ? requestedBank : assignedBank;
- // TODO: fix object-id handling
- objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+ if (outFile.good()) {
+ outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl;
+ outFile.close();
+ }
+ }
}
-uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
+void ManagementAgentImpl::retrieveData()
{
- return 0;
+ if (!storeFile.empty()) {
+ ifstream inFile(storeFile.c_str());
+ string mn;
+
+ if (inFile.good()) {
+ inFile >> mn;
+ if (mn == storeMagicNumber) {
+ inFile >> requestedBank;
+ inFile >> bootSequence;
+ }
+ inFile.close();
+ }
+ }
}
-int ManagementAgentImpl::getSignalFd(void)
+void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
+ uint32_t code, string text)
{
- return -1;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'z', sequence);
+ outBuffer.putLong(code);
+ outBuffer.putShortString(text);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
}
void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
- uint32_t assigned;
- stringstream key;
- assigned = inBuffer.getLong();
- objIdPrefix = ((uint64_t) assigned) << 24;
+ brokerBank = inBuffer.getLong();
+ assignedBank = inBuffer.getLong();
+ if (assignedBank != requestedBank) {
+ if (requestedBank == 0)
+ cout << "Initial object-id bank assigned: " << assignedBank << endl;
+ else
+ cout << "Collision in object-id! New bank assigned: " << assignedBank << endl;
+ storeData();
+ }
- startupWait = false;
- startupCond.notify();
+ attachment.setBanks(brokerBank, assignedBank);
// Bind to qpid.management to receive commands
- key << "agent." << assigned;
- session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
- arg::bindingKey=key.str());
+ connThreadBody.bindToBank(assignedBank);
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
@@ -198,9 +265,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
EncodeHeader(outBuffer, 'p');
EncodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
// Send class indications for all local classes
ClassMap cMap = pIter->second;
@@ -208,9 +275,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
outBuffer.reset();
EncodeHeader(outBuffer, 'q');
EncodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -236,9 +303,9 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
EncodeHeader(outBuffer, 's', sequence);
schema.writeSchemaCall(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -249,28 +316,93 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
clientWasAdded = true;
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
{
string methodName;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
EncodeHeader(outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ }
+ else
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ ft.decode(inBuffer);
+ value = ft.get("_class");
+ if (value.get() == 0 || !value->convertsTo<string>())
+ {
+ // TODO: Send completion with an error code
+ return;
+ }
+
+ string className(value->get<string>());
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++)
+ {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className)
+ {
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'g', sequence);
+ object->writeProperties(outBuffer);
+ object->writeStatistics(outBuffer, true);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ }
+ }
+
+ sendCommandComplete(replyTo, sequence);
+}
+
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ if (extThread) {
+ Mutex::ScopedLock lock(agentLock);
+ string body;
+
+ inBuffer.getRawData(body, inBuffer.available());
+ methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ write(writeFd, "X", 1);
+ } else {
+ invokeMethodRequest(inBuffer, sequence, replyTo);
+ }
}
void ManagementAgentImpl::received(Message& msg)
@@ -287,103 +419,86 @@ void ManagementAgentImpl::received(Message& msg)
replyToKey = rt.getRoutingKey();
}
- if (CheckHeader (inBuffer, &opcode, &sequence))
+ if (CheckHeader(inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
}
}
-void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('1');
- buf.putOctet (opcode);
- buf.putLong (seq);
+ buf.putOctet('A');
+ buf.putOctet('M');
+ buf.putOctet('1');
+ buf.putOctet(opcode);
+ buf.putLong (seq);
}
-bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
return false;
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
return h1 == 'A' && h2 == 'M' && h3 == '1';
}
-void ManagementAgentImpl::SendBuffer (Buffer& buf,
- uint32_t length,
- string exchange,
- string routingKey)
-{
- Message msg;
- string data;
-
- if (objIdPrefix == 0)
- return;
-
- buf.getRawData(data, length);
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData (data);
- session.messageTransfer (arg::content=msg, arg::destination=exchange);
-}
-
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name)
{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
+ PackageMap::iterator pIter = packages.find(name);
+ if (pIter != packages.end())
return pIter;
// No such package found, create a new map entry.
std::pair<PackageMap::iterator, bool> result =
- packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+ packages.insert(std::pair<string, ClassMap>(name, ClassMap()));
// Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+ EncodeHeader(outBuffer, 'p');
+ EncodePackageIndication(outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package");
return result.first;
}
void ManagementAgentImpl::moveNewObjectsLH()
{
- Mutex::ScopedLock lock (addLock);
- for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
- iter != newManagementObjects.end ();
+ Mutex::ScopedLock lock(addLock);
+ for (ManagementObjectMap::iterator iter = newManagementObjects.begin();
+ iter != newManagementObjects.end();
iter++)
managementObjects[iter->first] = iter->second;
newManagementObjects.clear();
}
-void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter,
+ string className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
@@ -395,21 +510,21 @@ void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter,
// TODO: Publish a class-indication message
}
-void ManagementAgentImpl::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementAgentImpl::EncodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
}
-void ManagementAgentImpl::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementAgentImpl::EncodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128 (key.hash);
}
void ManagementAgentImpl::PeriodicProcessing()
@@ -419,17 +534,17 @@ void ManagementAgentImpl::PeriodicProcessing()
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
- std::list<uint64_t> deleteList;
+ std::list<ObjectId> deleteList;
{
Buffer msgBuffer(msgChars, BUFSIZE);
EncodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
routingKey = "mgmt." + systemId.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
moveNewObjectsLH();
@@ -437,65 +552,171 @@ void ManagementAgentImpl::PeriodicProcessing()
if (clientWasAdded)
{
clientWasAdded = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++)
{
ManagementObject* object = iter->second;
- object->setAllChanged ();
+ object->setAllChanged();
}
}
- if (managementObjects.empty ())
+ if (managementObjects.empty())
return;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++)
{
ManagementObject* object = iter->second;
- if (object->getConfigChanged () || object->isDeleted ())
+ if (object->getConfigChanged() || object->isDeleted())
{
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'c');
object->writeProperties(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
- if (object->getInstChanged ())
+ if (object->getInstChanged())
{
- Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ EncodeHeader(msgBuffer, 'i');
object->writeStatistics(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+ contentSize = BUFSIZE - msgBuffer.available();
+ msgBuffer.reset();
+ routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName();
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
}
- if (object->isDeleted ())
- deleteList.push_back (iter->first);
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
}
// Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
- iter != deleteList.rend ();
+ for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
iter++)
- managementObjects.erase (*iter);
+ managementObjects.erase(*iter);
- deleteList.clear ();
+ deleteList.clear();
}
-void ManagementAgentImpl::BackgroundThread::run()
+void ManagementAgentImpl::ConnectionThread::run()
+{
+ static const int delayMin(1);
+ static const int delayMax(128);
+ static const int delayFactor(2);
+ int delay(delayMin);
+ string dest("qmfagent");
+
+ sessionId.generate();
+ queueName << "qmfagent-" << sessionId;
+
+ while (true) {
+ try {
+ if (!agent.host.empty()) {
+ connection.open(agent.host.c_str(), agent.port);
+ session = connection.newSession(queueName.str());
+ subscriptions = new client::SubscriptionManager(session);
+
+ session.queueDeclare(arg::queue=queueName.str());
+ session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
+ arg::bindingKey=queueName.str());
+
+ subscriptions->subscribe(agent, queueName.str(), dest);
+ {
+ Mutex::ScopedLock _lock(connLock);
+ operational = true;
+ agent.startProtocol();
+ try {
+ Mutex::ScopedUnlock _unlock(connLock);
+ subscriptions->run();
+ } catch (std::exception) {}
+
+ operational = false;
+ }
+ delay = delayMin;
+ delete subscriptions;
+ subscriptions = 0;
+ session.close();
+ }
+ } catch (std::exception &e) {
+ if (delay < delayMax)
+ delay *= delayFactor;
+ }
+
+ ::sleep(delay);
+ }
+}
+
+ManagementAgentImpl::ConnectionThread::~ConnectionThread()
+{
+ if (subscriptions != 0) {
+ delete subscriptions;
+ }
+}
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
+ uint32_t length,
+ string exchange,
+ string routingKey)
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (!operational)
+ return;
+ }
+
+ Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.getDeliveryProperties().setRoutingKey(routingKey);
+ msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+ msg.setData(data);
+ session.messageTransfer(arg::content=msg, arg::destination=exchange);
+}
+
+void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank)
+{
+ stringstream key;
+ key << "agent." << agentBank;
+ session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+}
+
+
+void ManagementAgentImpl::PublishThread::run()
{
while (true) {
::sleep(5);
agent.PeriodicProcessing();
}
}
+
+Mutex& ManagementAgentImpl::getMutex()
+{
+ return agentLock;
+}
+
+Buffer* ManagementAgentImpl::startEventLH()
+{
+ Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+ EncodeHeader(*outBuffer, 'e');
+ outBuffer->putLongLong(uint64_t(Duration(now())));
+ return outBuffer;
+}
+
+void ManagementAgentImpl::finishEventLH(Buffer* outBuffer)
+{
+ uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+ outBuffer->reset();
+ connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event");
+ delete outBuffer;
+}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index f7f19e145d..7d9be6daf9 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -22,7 +22,7 @@
#include "ManagementAgent.h"
#include "qpid/client/Connection.h"
-#include "qpid/client/Dispatcher.h"
+#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Session.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/Message.h"
@@ -30,10 +30,10 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
+#include <deque>
namespace qpid {
namespace management {
@@ -49,14 +49,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void init(std::string brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
- bool useExternalThread = false);
+ bool useExternalThread = false,
+ std::string storeFile = "");
void RegisterClass(std::string packageName,
std::string className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- uint64_t addObject (management::ManagementObject* objectPtr,
- uint32_t persistId = 0,
- uint32_t persistBank = 4);
+ ObjectId addObject (management::ManagementObject* objectPtr,
+ uint64_t persistId = 0);
uint32_t pollCallbacks (uint32_t callLimit = 0);
int getSignalFd (void);
@@ -64,14 +64,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
private:
- struct SchemaClassKey
- {
+ struct SchemaClassKey {
std::string name;
uint8_t hash[16];
};
- struct SchemaClassKeyComp
- {
+ struct SchemaClassKeyComp {
bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
{
if (lhs.name != rhs.name)
@@ -84,53 +82,95 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
}
};
- struct SchemaClass
- {
+ struct SchemaClass {
management::ManagementObject::writeSchemaCall_t writeSchemaCall;
SchemaClass () : writeSchemaCall(0) {}
};
+ struct QueuedMethod {
+ QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
+ sequence(_seq), replyTo(_reply), body(_body) {}
+
+ uint32_t sequence;
+ std::string replyTo;
+ std::string body;
+ };
+
+ typedef std::deque<QueuedMethod*> MethodQueue;
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
PackageMap packages;
+ AgentAttachment attachment;
management::ManagementObjectMap managementObjects;
management::ManagementObjectMap newManagementObjects;
+ MethodQueue methodQueue;
void received (client::Message& msg);
uint16_t interval;
bool extThread;
+ int writeFd;
+ int readFd;
uint64_t nextObjectId;
+ std::string storeFile;
sys::Mutex agentLock;
sys::Mutex addLock;
- framing::Uuid sessionId;
framing::Uuid systemId;
+ std::string host;
+ uint16_t port;
- int signalFdIn, signalFdOut;
- client::Connection connection;
- client::Session session;
- client::Dispatcher* dispatcher;
bool clientWasAdded;
- uint64_t objIdPrefix;
- std::stringstream queueName;
+ uint32_t requestedBank;
+ uint32_t assignedBank;
+ uint32_t brokerBank;
+ uint16_t bootSequence;
# define MA_BUFFER_SIZE 65536
char outputBuffer[MA_BUFFER_SIZE];
+ char eventBuffer[MA_BUFFER_SIZE];
- class BackgroundThread : public sys::Runnable
+ friend class ConnectionThread;
+ class ConnectionThread : public sys::Runnable
{
+ bool operational;
ManagementAgentImpl& agent;
+ framing::Uuid sessionId;
+ client::Connection connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ std::stringstream queueName;
+ sys::Mutex connLock;
void run();
public:
- BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ ConnectionThread(ManagementAgentImpl& _agent) :
+ operational(false), agent(_agent), subscriptions(0) {}
+ ~ConnectionThread();
+ void sendBuffer(qpid::framing::Buffer& buf,
+ uint32_t length,
+ std::string exchange,
+ std::string routingKey);
+ void bindToBank(uint32_t agentBank);
};
- BackgroundThread bgThread;
- sys::Thread thread;
- sys::Condition startupCond;
- bool startupWait;
+ class PublishThread : public sys::Runnable
+ {
+ ManagementAgentImpl& agent;
+ void run();
+ public:
+ PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+ };
+
+ ConnectionThread connThreadBody;
+ sys::Thread connThread;
+ PublishThread pubThreadBody;
+ sys::Thread pubThread;
+
+ static const std::string storeMagicNumber;
+ void startProtocol();
+ void storeData(bool requested=false);
+ void retrieveData();
PackageMap::iterator FindOrAddPackage (std::string name);
void moveNewObjectsLH();
void AddClassLocal (PackageMap::iterator pIter,
@@ -144,16 +184,19 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
ClassMap::iterator cIter);
void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (qpid::framing::Buffer& buf,
- uint32_t length,
- std::string exchange,
- std::string routingKey);
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
void handleAttachResponse (qpid::framing::Buffer& inBuffer);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* outBuffer);
};
}}