diff options
| author | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-07-31 13:15:16 +0000 |
| commit | 9fb5fd7a0a800591c334bde2b9556e984217d7de (patch) | |
| tree | c632c481f9cbf647d4ce453ff1076895866fc5e7 /cpp/src/qpid/agent | |
| parent | 033f088884f2e6bbc08d6027e1507b6d67eaad53 (diff) | |
| download | qpid-python-9fb5fd7a0a800591c334bde2b9556e984217d7de.tar.gz | |
QPID-1174 - Management updates for remote agents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 97 | ||||
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 4 |
2 files changed, 79 insertions, 22 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 5cff0fcd3c..85f13ba15d 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -67,16 +67,21 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() } ManagementAgentImpl::ManagementAgentImpl() : - clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread) + clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false) { // TODO: Establish system ID } -void ManagementAgentImpl::init (std::string brokerHost, - uint16_t brokerPort, - uint16_t intervalSeconds, - bool useExternalThread) +void ManagementAgentImpl::init(std::string brokerHost, + uint16_t brokerPort, + uint16_t intervalSeconds, + bool useExternalThread) { + { + Mutex::ScopedLock lock(agentLock); + startupWait = true; + } + interval = intervalSeconds; extThread = useExternalThread; nextObjectId = 1; @@ -92,17 +97,17 @@ void ManagementAgentImpl::init (std::string brokerHost, session.queueDeclare (arg::queue=queueName.str()); session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(), - arg::bindingKey=queueName.str ()); + arg::bindingKey=queueName.str()); session.messageSubscribe (arg::queue=queueName.str(), arg::destination=dest); session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF); session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF); Message attachRequest; - char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream + char rawbuffer[512]; Buffer buffer (rawbuffer, 512); - attachRequest.getDeliveryProperties().setRoutingKey("agent"); + attachRequest.getDeliveryProperties().setRoutingKey("broker"); attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str())); EncodeHeader (buffer, 'A'); @@ -115,15 +120,22 @@ void ManagementAgentImpl::init (std::string brokerHost, string stringBuffer (rawbuffer, length); attachRequest.setData (stringBuffer); - session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management"); + session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management"); - dispatcher->listen (dest, this); - dispatcher->start (); + dispatcher->listen(dest, this); + dispatcher->start(); + + { + Mutex::ScopedLock lock(agentLock); + if (startupWait) + startupCond.wait(agentLock); + } } -ManagementAgentImpl::~ManagementAgentImpl () +ManagementAgentImpl::~ManagementAgentImpl() { - dispatcher->stop (); + dispatcher->stop(); + session.close(); delete dispatcher; } @@ -151,24 +163,33 @@ uint64_t ManagementAgentImpl::addObject (ManagementObject* object, return objectId; } -uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/) +uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/) { return 0; } -int ManagementAgentImpl::getSignalFd (void) +int ManagementAgentImpl::getSignalFd(void) { return -1; } -void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) +void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer) { Mutex::ScopedLock lock(agentLock); uint32_t assigned; + stringstream key; assigned = inBuffer.getLong(); objIdPrefix = ((uint64_t) assigned) << 24; + startupWait = false; + startupCond.notify(); + + // Bind to qpid.management to receive commands + key << "agent." << assigned; + session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(), + arg::bindingKey=key.str()); + // Send package indications for all local packages for (PackageMap::iterator pIter = packages.begin(); pIter != packages.end(); @@ -180,7 +201,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodePackageIndication(outBuffer, pIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); // Send class indications for all local classes ClassMap cMap = pIter->second; @@ -190,7 +211,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer) EncodeClassIndication(outBuffer, pIter, cIter); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -218,7 +239,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc schema.writeSchemaCall(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available (); outBuffer.reset(); - SendBuffer(outBuffer, outLen, "qpid.management", "agent"); + SendBuffer(outBuffer, outLen, "qpid.management", "broker"); } } } @@ -229,18 +250,50 @@ void ManagementAgentImpl::handleConsoleAddedIndication() clientWasAdded = true; } -void ManagementAgentImpl::received (Message& msg) +void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo) +{ + string methodName; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + uint64_t objId = inBuffer.getLongLong(); + inBuffer.getShortString(methodName); + + EncodeHeader(outBuffer, 'm', sequence); + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + iter->second->doMethod(methodName, inBuffer, outBuffer); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + SendBuffer(outBuffer, outLen, "amq.direct", replyTo); +} + +void ManagementAgentImpl::received(Message& msg) { - string data = msg.getData (); - Buffer inBuffer (const_cast<char*>(data.c_str()), data.size()); + string data = msg.getData(); + Buffer inBuffer(const_cast<char*>(data.c_str()), data.size()); uint8_t opcode; uint32_t sequence; + string replyToKey; + + framing::MessageProperties p = msg.getMessageProperties(); + if (p.hasReplyTo()) { + const framing::ReplyTo& rt = p.getReplyTo(); + replyToKey = rt.getRoutingKey(); + } if (CheckHeader (inBuffer, &opcode, &sequence)) { if (opcode == 'a') handleAttachResponse(inBuffer); else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence); else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey); } } diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 2ecf63cd5d..f7f19e145d 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -30,6 +30,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -127,6 +128,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen BackgroundThread bgThread; sys::Thread thread; + sys::Condition startupCond; + bool startupWait; PackageMap::iterator FindOrAddPackage (std::string name); void moveNewObjectsLH(); @@ -149,6 +152,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence); + void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo); void handleConsoleAddedIndication(); }; |
