diff options
| author | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-09-03 18:01:44 +0000 |
| commit | 13191e7951b29453c75db8384391d26f0402139d (patch) | |
| tree | b3ca5bd5f9c8523924cf9d1a0fe118099bef9894 /cpp/src/qpid/agent | |
| parent | b8cf7ea1034fa6216183f03022c83efdb154f3ee (diff) | |
| download | qpid-python-13191e7951b29453c75db8384391d26f0402139d.tar.gz | |
QPID-1174 Updates to the management framework
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@691700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 585 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 101 |
3 files changed, 488 insertions, 215 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index e7379e6c94..1c219f7463 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -65,10 +65,14 @@ class ManagementAgent // agent's thread. In this case, the callback implementations // MUST be thread safe. // + // storeFile - File where this process has read and write access. This + // file shall be used to store persistent state. + // virtual void init (std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false) = 0; + bool useExternalThread = false, + std::string storeFile = "") = 0; // Register a schema with the management agent. This is normally called by the // package initializer generated by the management code generator. @@ -93,9 +97,8 @@ class ManagementAgent // pointer. This allows the management agent to report the deletion of the object // in an orderly way. // - virtual uint64_t addObject (ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4) = 0; + virtual ObjectId addObject (ManagementObject* objectPtr, + uint64_t persistId = 0) = 0; // If "useExternalThread" was set to true in init, this method must // be called to provide a thread for any pending method calls that have arrived. @@ -120,6 +123,12 @@ class ManagementAgent // virtual int getSignalFd (void) = 0; +protected: + friend class ManagementObject; + virtual sys::Mutex& getMutex() = 0; + virtual framing::Buffer* startEventLH() = 0; + virtual void finishEventLH(framing::Buffer* buf) = 0; + }; }} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index ebdc71e3b1..c4108b0ae2 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -24,12 +24,21 @@ #include <list> #include <unistd.h> #include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <unistd.h> +#include <fcntl.h> +#include <iostream> +#include <fstream> + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::sys; using std::stringstream; +using std::ofstream; +using std::ifstream; using std::string; using std::cout; using std::endl; @@ -66,128 +75,186 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() return agent; } +const string ManagementAgentImpl::storeMagicNumber("MA01"); + ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) + extThread(false), writeFd(-1), readFd(-1), + clientWasAdded(true), requestedBank(0), + assignedBank(0), brokerBank(0), bootSequence(0), + connThreadBody(*this), connThread(connThreadBody), + pubThreadBody(*this), pubThread(pubThreadBody) { // TODO: Establish system ID } -void ManagementAgentImpl::init(std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread, + string _storeFile) { - { - Mutex::ScopedLock lock(agentLock); - startupWait = true; - } - interval = intervalSeconds; extThread = useExternalThread; + storeFile = _storeFile; nextObjectId = 1; + host = brokerHost; + port = brokerPort; + + // TODO: Abstract the socket calls for portability + if (extThread) { + int pair[2]; + int result = socketpair(PF_LOCAL, SOCK_STREAM, 0, pair); + if (result == -1) { + return; + } + writeFd = pair[0]; + readFd = pair[1]; - sessionId.generate(); - queueName << "qmfagent-" << sessionId; - string dest = "qmfagent"; - - connection.open(brokerHost.c_str(), brokerPort); - session = connection.newSession (queueName.str()); - dispatcher = new client::Dispatcher(session); + // Set the readFd to non-blocking + int flags = fcntl(readFd, F_GETFL); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + } + retrieveData(); + bootSequence++; + if ((bootSequence & 0xF000) != 0) + bootSequence = 1; + storeData(true); +} - session.queueDeclare (arg::queue=queueName.str()); - session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str()); - session.messageSubscribe (arg::queue=queueName.str(), - arg::destination=dest); - session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); - session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); +ManagementAgentImpl::~ManagementAgentImpl() +{ +} - Message attachRequest; - char rawbuffer[512]; - Buffer buffer (rawbuffer, 512); +void ManagementAgentImpl::RegisterClass(std::string packageName, + std::string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(agentLock); + PackageMap::iterator pIter = FindOrAddPackage(packageName); + AddClassLocal(pIter, className, md5Sum, schemaCall); +} - attachRequest.getDeliveryProperties().setRoutingKey("broker"); - attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); +ObjectId ManagementAgentImpl::addObject(ManagementObject* object, + uint64_t persistId) +{ + Mutex::ScopedLock lock(addLock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; - EncodeHeader (buffer, 'A'); - buffer.putShortString ("RemoteAgent [C++]"); - systemId.encode (buffer); - buffer.putLong (11); + ObjectId objectId(&attachment, 0, sequence, objectNum); - size_t length = 512 - buffer.available (); - string stringBuffer (rawbuffer, length); - attachRequest.setData (stringBuffer); + // TODO: fix object-id handling + object->setObjectId(objectId); + newManagementObjects[objectId] = object; + return objectId; +} - session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) +{ + Mutex::ScopedLock lock(agentLock); - dispatcher->listen(dest, this); - dispatcher->start(); + for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) { + if (methodQueue.empty()) + break; - { - Mutex::ScopedLock lock(agentLock); - if (startupWait) - startupCond.wait(agentLock); + QueuedMethod* item = methodQueue.front(); + methodQueue.pop_front(); + { + Mutex::ScopedUnlock unlock(agentLock); + Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size()); + invokeMethodRequest(inBuffer, item->sequence, item->replyTo); + delete item; + } } + + uint8_t rbuf[100]; + while (read(readFd, rbuf, 100) > 0); // Consume all signaling bytes + return methodQueue.size(); } -ManagementAgentImpl::~ManagementAgentImpl() +int ManagementAgentImpl::getSignalFd(void) { - dispatcher->stop(); - session.close(); - delete dispatcher; + return readFd; } -void ManagementAgentImpl::RegisterClass (std::string packageName, - std::string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(agentLock); - PackageMap::iterator pIter = FindOrAddPackage (packageName); - AddClassLocal (pIter, className, md5Sum, schemaCall); +void ManagementAgentImpl::startProtocol() +{ + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + EncodeHeader(buffer, 'A'); + buffer.putShortString("RemoteAgent [C++]"); + systemId.encode (buffer); + buffer.putLong(requestedBank); + uint32_t length = 512 - buffer.available(); + buffer.reset(); + connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker"); } -uint64_t ManagementAgentImpl::addObject (ManagementObject* object, - uint32_t /*persistId*/, - uint32_t /*persistBank*/) +void ManagementAgentImpl::storeData(bool requested) { - Mutex::ScopedLock lock(addLock); - uint64_t objectId; + if (!storeFile.empty()) { + ofstream outFile(storeFile.c_str()); + uint32_t bankToWrite = requested ? requestedBank : assignedBank; - // TODO: fix object-id handling - objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF); - object->setObjectId (objectId); - newManagementObjects[objectId] = object; - return objectId; + if (outFile.good()) { + outFile << storeMagicNumber << " " << bankToWrite << " " << bootSequence << endl; + outFile.close(); + } + } } -uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) +void ManagementAgentImpl::retrieveData() { - return 0; + if (!storeFile.empty()) { + ifstream inFile(storeFile.c_str()); + string mn; + + if (inFile.good()) { + inFile >> mn; + if (mn == storeMagicNumber) { + inFile >> requestedBank; + inFile >> bootSequence; + } + inFile.close(); + } + } } -int ManagementAgentImpl::getSignalFd(void) +void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence, + uint32_t code, string text) { - return -1; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'z', sequence); + outBuffer.putLong(code); + outBuffer.putShortString(text); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey); } void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); - uint32_t assigned; - stringstream key; - assigned = inBuffer.getLong(); - objIdPrefix = ((uint64_t) assigned) << 24; + brokerBank = inBuffer.getLong(); + assignedBank = inBuffer.getLong(); + if (assignedBank != requestedBank) { + if (requestedBank == 0) + cout << "Initial object-id bank assigned: " << assignedBank << endl; + else + cout << "Collision in object-id! New bank assigned: " << assignedBank << endl; + storeData(); + } - startupWait = false; - startupCond.notify(); + attachment.setBanks(brokerBank, assignedBank); // Bind to qpid.management to receive commands - key << "agent." << assigned; - session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), - arg::bindingKey=key.str()); + connThreadBody.bindToBank(assignedBank); // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); @@ -198,9 +265,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) EncodeHeader(outBuffer, 'p'); EncodePackageIndication(outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -208,9 +275,9 @@ void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) outBuffer.reset(); EncodeHeader(outBuffer, 'q'); EncodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -236,9 +303,9 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc EncodeHeader(outBuffer, 's', sequence); schema.writeSchemaCall(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "broker"); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -249,28 +316,93 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) { string methodName; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - uint64_t objId = inBuffer.getLongLong(); + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); inBuffer.getShortString(methodName); EncodeHeader(outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT)); } else { - iter->second->doMethod(methodName, inBuffer, outBuffer); + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + iter->second->doMethod(methodName, inBuffer, outBuffer); } outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "amq.direct", replyTo); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo<string>()) + { + // TODO: Send completion with an error code + return; + } + + string className(value->get<string>()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) + { + ManagementObject* object = iter->second; + if (object->getClassName() == className) + { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + EncodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + } + } + + sendCommandComplete(replyTo, sequence); +} + +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + if (extThread) { + Mutex::ScopedLock lock(agentLock); + string body; + + inBuffer.getRawData(body, inBuffer.available()); + methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); + write(writeFd, "X", 1); + } else { + invokeMethodRequest(inBuffer, sequence, replyTo); + } } void ManagementAgentImpl::received(Message& msg) @@ -287,103 +419,86 @@ void ManagementAgentImpl::received(Message& msg) replyToKey = rt.getRoutingKey(); } - if (CheckHeader (inBuffer, &opcode, &sequence)) + if (CheckHeader(inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey); else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } -void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +void ManagementAgentImpl::EncodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) { - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('1'); - buf.putOctet (opcode); - buf.putLong (seq); + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('1'); + buf.putOctet(opcode); + buf.putLong (seq); } -bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +bool ManagementAgentImpl::CheckHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) { if (buf.getSize() < 8) return false; - uint8_t h1 = buf.getOctet (); - uint8_t h2 = buf.getOctet (); - uint8_t h3 = buf.getOctet (); + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); - *opcode = buf.getOctet (); - *seq = buf.getLong (); + *opcode = buf.getOctet(); + *seq = buf.getLong(); return h1 == 'A' && h2 == 'M' && h3 == '1'; } -void ManagementAgentImpl::SendBuffer (Buffer& buf, - uint32_t length, - string exchange, - string routingKey) -{ - Message msg; - string data; - - if (objIdPrefix == 0) - return; - - buf.getRawData(data, length); - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); - msg.setData (data); - session.messageTransfer (arg::content=msg, arg::destination=exchange); -} - -ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name) +ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage(std::string name) { - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) + PackageMap::iterator pIter = packages.find(name); + if (pIter != packages.end()) return pIter; // No such package found, create a new map entry. std::pair<PackageMap::iterator, bool> result = - packages.insert (std::pair<string, ClassMap> (name, ClassMap ())); + packages.insert(std::pair<string, ClassMap>(name, ClassMap())); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t outLen; - EncodeHeader (outBuffer, 'p'); - EncodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package"); + EncodeHeader(outBuffer, 'p'); + EncodePackageIndication(outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "mgmt.schema.package"); return result.first; } void ManagementAgentImpl::moveNewObjectsLH() { - Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); + Mutex::ScopedLock lock(addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin(); + iter != newManagementObjects.end(); iter++) managementObjects[iter->first] = iter->second; newManagementObjects.clear(); } -void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, - string className, - uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) +void ManagementAgentImpl::AddClassLocal(PackageMap::iterator pIter, + string className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; key.name = className; - memcpy (&key.hash, md5Sum, 16); + memcpy(&key.hash, md5Sum, 16); - ClassMap::iterator cIter = cMap.find (key); - if (cIter != cMap.end ()) + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) return; // No such class found, create a new class with local information. @@ -395,21 +510,21 @@ void ManagementAgentImpl::AddClassLocal (PackageMap::iterator pIter, // TODO: Publish a class-indication message } -void ManagementAgentImpl::EncodePackageIndication (Buffer& buf, - PackageMap::iterator pIter) +void ManagementAgentImpl::EncodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) { - buf.putShortString ((*pIter).first); + buf.putShortString((*pIter).first); } -void ManagementAgentImpl::EncodeClassIndication (Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) +void ManagementAgentImpl::EncodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) { SchemaClassKey key = (*cIter).first; - buf.putShortString ((*pIter).first); - buf.putShortString (key.name); - buf.putBin128 (key.hash); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128 (key.hash); } void ManagementAgentImpl::PeriodicProcessing() @@ -419,17 +534,17 @@ void ManagementAgentImpl::PeriodicProcessing() char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; - std::list<uint64_t> deleteList; + std::list<ObjectId> deleteList; { Buffer msgBuffer(msgChars, BUFSIZE); EncodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(Duration(now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); routingKey = "mgmt." + systemId.str() + ".heartbeat"; - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } moveNewObjectsLH(); @@ -437,65 +552,171 @@ void ManagementAgentImpl::PeriodicProcessing() if (clientWasAdded) { clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - object->setAllChanged (); + object->setAllChanged(); } } - if (managementObjects.empty ()) + if (managementObjects.empty()) return; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; - if (object->getConfigChanged () || object->isDeleted ()) + if (object->getConfigChanged() || object->isDeleted()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'c'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'c'); object->writeProperties(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->getInstChanged ()) + if (object->getInstChanged()) { - Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer, 'i'); + Buffer msgBuffer(msgChars, BUFSIZE); + EncodeHeader(msgBuffer, 'i'); object->writeStatistics(msgBuffer); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName (); - SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey); + contentSize = BUFSIZE - msgBuffer.available(); + msgBuffer.reset(); + routingKey = "mgmt." + systemId.str() + ".stat." + object->getClassName(); + connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey); } - if (object->isDeleted ()) - deleteList.push_back (iter->first); + if (object->isDeleted()) + deleteList.push_back(iter->first); } // Delete flagged objects - for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); + for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); iter++) - managementObjects.erase (*iter); + managementObjects.erase(*iter); - deleteList.clear (); + deleteList.clear(); } -void ManagementAgentImpl::BackgroundThread::run() +void ManagementAgentImpl::ConnectionThread::run() +{ + static const int delayMin(1); + static const int delayMax(128); + static const int delayFactor(2); + int delay(delayMin); + string dest("qmfagent"); + + sessionId.generate(); + queueName << "qmfagent-" << sessionId; + + while (true) { + try { + if (!agent.host.empty()) { + connection.open(agent.host.c_str(), agent.port); + session = connection.newSession(queueName.str()); + subscriptions = new client::SubscriptionManager(session); + + session.queueDeclare(arg::queue=queueName.str()); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), + arg::bindingKey=queueName.str()); + + subscriptions->subscribe(agent, queueName.str(), dest); + { + Mutex::ScopedLock _lock(connLock); + operational = true; + agent.startProtocol(); + try { + Mutex::ScopedUnlock _unlock(connLock); + subscriptions->run(); + } catch (std::exception) {} + + operational = false; + } + delay = delayMin; + delete subscriptions; + subscriptions = 0; + session.close(); + } + } catch (std::exception &e) { + if (delay < delayMax) + delay *= delayFactor; + } + + ::sleep(delay); + } +} + +ManagementAgentImpl::ConnectionThread::~ConnectionThread() +{ + if (subscriptions != 0) { + delete subscriptions; + } +} + +void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf, + uint32_t length, + string exchange, + string routingKey) +{ + { + Mutex::ScopedLock _lock(connLock); + if (!operational) + return; + } + + Message msg; + string data; + + buf.getRawData(data, length); + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); + msg.setData(data); + session.messageTransfer(arg::content=msg, arg::destination=exchange); +} + +void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t agentBank) +{ + stringstream key; + key << "agent." << agentBank; + session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); +} + + +void ManagementAgentImpl::PublishThread::run() { while (true) { ::sleep(5); agent.PeriodicProcessing(); } } + +Mutex& ManagementAgentImpl::getMutex() +{ + return agentLock; +} + +Buffer* ManagementAgentImpl::startEventLH() +{ + Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE)); + EncodeHeader(*outBuffer, 'e'); + outBuffer->putLongLong(uint64_t(Duration(now()))); + return outBuffer; +} + +void ManagementAgentImpl::finishEventLH(Buffer* outBuffer) +{ + uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available(); + outBuffer->reset(); + connThreadBody.sendBuffer(*outBuffer, outLen, "qpid.management", "mgmt.event"); + delete outBuffer; +} diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index f7f19e145d..7d9be6daf9 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -22,7 +22,7 @@ #include "ManagementAgent.h" #include "qpid/client/Connection.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/client/Session.h" #include "qpid/client/AsyncSession.h" #include "qpid/client/Message.h" @@ -30,10 +30,10 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> +#include <deque> namespace qpid { namespace management { @@ -49,14 +49,14 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void init(std::string brokerHost = "localhost", uint16_t brokerPort = 5672, uint16_t intervalSeconds = 10, - bool useExternalThread = false); + bool useExternalThread = false, + std::string storeFile = ""); void RegisterClass(std::string packageName, std::string className, uint8_t* md5Sum, management::ManagementObject::writeSchemaCall_t schemaCall); - uint64_t addObject (management::ManagementObject* objectPtr, - uint32_t persistId = 0, - uint32_t persistBank = 4); + ObjectId addObject (management::ManagementObject* objectPtr, + uint64_t persistId = 0); uint32_t pollCallbacks (uint32_t callLimit = 0); int getSignalFd (void); @@ -64,14 +64,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen private: - struct SchemaClassKey - { + struct SchemaClassKey { std::string name; uint8_t hash[16]; }; - struct SchemaClassKeyComp - { + struct SchemaClassKeyComp { bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const { if (lhs.name != rhs.name) @@ -84,53 +82,95 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen } }; - struct SchemaClass - { + struct SchemaClass { management::ManagementObject::writeSchemaCall_t writeSchemaCall; SchemaClass () : writeSchemaCall(0) {} }; + struct QueuedMethod { + QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) : + sequence(_seq), replyTo(_reply), body(_body) {} + + uint32_t sequence; + std::string replyTo; + std::string body; + }; + + typedef std::deque<QueuedMethod*> MethodQueue; typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; typedef std::map<std::string, ClassMap> PackageMap; PackageMap packages; + AgentAttachment attachment; management::ManagementObjectMap managementObjects; management::ManagementObjectMap newManagementObjects; + MethodQueue methodQueue; void received (client::Message& msg); uint16_t interval; bool extThread; + int writeFd; + int readFd; uint64_t nextObjectId; + std::string storeFile; sys::Mutex agentLock; sys::Mutex addLock; - framing::Uuid sessionId; framing::Uuid systemId; + std::string host; + uint16_t port; - int signalFdIn, signalFdOut; - client::Connection connection; - client::Session session; - client::Dispatcher* dispatcher; bool clientWasAdded; - uint64_t objIdPrefix; - std::stringstream queueName; + uint32_t requestedBank; + uint32_t assignedBank; + uint32_t brokerBank; + uint16_t bootSequence; # define MA_BUFFER_SIZE 65536 char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; - class BackgroundThread : public sys::Runnable + friend class ConnectionThread; + class ConnectionThread : public sys::Runnable { + bool operational; ManagementAgentImpl& agent; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + client::SubscriptionManager* subscriptions; + std::stringstream queueName; + sys::Mutex connLock; void run(); public: - BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {} + ConnectionThread(ManagementAgentImpl& _agent) : + operational(false), agent(_agent), subscriptions(0) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + std::string exchange, + std::string routingKey); + void bindToBank(uint32_t agentBank); }; - BackgroundThread bgThread; - sys::Thread thread; - sys::Condition startupCond; - bool startupWait; + class PublishThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + public: + PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {} + }; + + ConnectionThread connThreadBody; + sys::Thread connThread; + PublishThread pubThreadBody; + sys::Thread pubThread; + + static const std::string storeMagicNumber; + void startProtocol(); + void storeData(bool requested=false); + void retrieveData(); PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); void AddClassLocal (PackageMap::iterator pIter, @@ -144,16 +184,19 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen ClassMap::iterator cIter); void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void SendBuffer (qpid::framing::Buffer& buf, - uint32_t length, - std::string exchange, - std::string routingKey); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); void handleAttachResponse (qpid::framing::Buffer& inBuffer); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); + void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); + sys::Mutex& getMutex(); + framing::Buffer* startEventLH(); + void finishEventLH(framing::Buffer* outBuffer); }; }} |
