summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/agent
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-07-31 13:15:16 +0000
committerTed Ross <tross@apache.org>2008-07-31 13:15:16 +0000
commit9fb5fd7a0a800591c334bde2b9556e984217d7de (patch)
treec632c481f9cbf647d4ce453ff1076895866fc5e7 /cpp/src/qpid/agent
parent033f088884f2e6bbc08d6027e1507b6d67eaad53 (diff)
downloadqpid-python-9fb5fd7a0a800591c334bde2b9556e984217d7de.tar.gz
QPID-1174 - Management updates for remote agents
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681362 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/agent')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp97
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h4
2 files changed, 79 insertions, 22 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 5cff0fcd3c..85f13ba15d 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -67,16 +67,21 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
}
ManagementAgentImpl::ManagementAgentImpl() :
- clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+ clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
{
// TODO: Establish system ID
}
-void ManagementAgentImpl::init (std::string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread)
+void ManagementAgentImpl::init(std::string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread)
{
+ {
+ Mutex::ScopedLock lock(agentLock);
+ startupWait = true;
+ }
+
interval = intervalSeconds;
extThread = useExternalThread;
nextObjectId = 1;
@@ -92,17 +97,17 @@ void ManagementAgentImpl::init (std::string brokerHost,
session.queueDeclare (arg::queue=queueName.str());
session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str ());
+ arg::bindingKey=queueName.str());
session.messageSubscribe (arg::queue=queueName.str(),
arg::destination=dest);
session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
Message attachRequest;
- char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream
+ char rawbuffer[512];
Buffer buffer (rawbuffer, 512);
- attachRequest.getDeliveryProperties().setRoutingKey("agent");
+ attachRequest.getDeliveryProperties().setRoutingKey("broker");
attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
EncodeHeader (buffer, 'A');
@@ -115,15 +120,22 @@ void ManagementAgentImpl::init (std::string brokerHost,
string stringBuffer (rawbuffer, length);
attachRequest.setData (stringBuffer);
- session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+ session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
- dispatcher->listen (dest, this);
- dispatcher->start ();
+ dispatcher->listen(dest, this);
+ dispatcher->start();
+
+ {
+ Mutex::ScopedLock lock(agentLock);
+ if (startupWait)
+ startupCond.wait(agentLock);
+ }
}
-ManagementAgentImpl::~ManagementAgentImpl ()
+ManagementAgentImpl::~ManagementAgentImpl()
{
- dispatcher->stop ();
+ dispatcher->stop();
+ session.close();
delete dispatcher;
}
@@ -151,24 +163,33 @@ uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
return objectId;
}
-uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
{
return 0;
}
-int ManagementAgentImpl::getSignalFd (void)
+int ManagementAgentImpl::getSignalFd(void)
{
return -1;
}
-void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
uint32_t assigned;
+ stringstream key;
assigned = inBuffer.getLong();
objIdPrefix = ((uint64_t) assigned) << 24;
+ startupWait = false;
+ startupCond.notify();
+
+ // Bind to qpid.management to receive commands
+ key << "agent." << assigned;
+ session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
pIter != packages.end();
@@ -180,7 +201,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
EncodePackageIndication(outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
// Send class indications for all local classes
ClassMap cMap = pIter->second;
@@ -190,7 +211,7 @@ void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
EncodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -218,7 +239,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
schema.writeSchemaCall(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -229,18 +250,50 @@ void ManagementAgentImpl::handleConsoleAddedIndication()
clientWasAdded = true;
}
-void ManagementAgentImpl::received (Message& msg)
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ string methodName;
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ uint64_t objId = inBuffer.getLongLong();
+ inBuffer.getShortString(methodName);
+
+ EncodeHeader(outBuffer, 'm', sequence);
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ } else {
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ }
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::received(Message& msg)
{
- string data = msg.getData ();
- Buffer inBuffer (const_cast<char*>(data.c_str()), data.size());
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
uint8_t opcode;
uint32_t sequence;
+ string replyToKey;
+
+ framing::MessageProperties p = msg.getMessageProperties();
+ if (p.hasReplyTo()) {
+ const framing::ReplyTo& rt = p.getReplyTo();
+ replyToKey = rt.getRoutingKey();
+ }
if (CheckHeader (inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
}
}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 2ecf63cd5d..f7f19e145d 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -30,6 +30,7 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
@@ -127,6 +128,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
BackgroundThread bgThread;
sys::Thread thread;
+ sys::Condition startupCond;
+ bool startupWait;
PackageMap::iterator FindOrAddPackage (std::string name);
void moveNewObjectsLH();
@@ -149,6 +152,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
};