summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-01-02 15:56:20 +0000
commit3fe6853a7029e48f693c0853e51af33be5c79aec (patch)
tree6139a715591aabc91370350aa26f854639a2aa11 /cpp/src/qpid/management
parent8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff)
downloadqpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp85
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h8
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h2
-rw-r--r--cpp/src/qpid/management/ManagementObject.h16
4 files changed, 79 insertions, 32 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 90da74404b..d3c5d7c266 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
}
+ManagementAgent::~ManagementAgent () {}
+
void ManagementAgent::enableManagement (void)
{
enabled = 1;
@@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
return agent;
}
+void ManagementAgent::shutdown (void)
+{
+ if (agent.get () != 0)
+ {
+ agent->mExchange.reset ();
+ agent->dExchange.reset ();
+ agent.reset ();
+ }
+}
+
void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange,
broker::Exchange::shared_ptr _dexchange)
{
@@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object)
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
: TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+ManagementAgent::Periodic::~Periodic () {}
+
void ManagementAgent::Periodic::fire ()
{
agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
@@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void)
}
}
-void ManagementAgent::EncodeHeader (Buffer& buf)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
{
buf.putOctet ('A');
buf.putOctet ('M');
buf.putOctet ('0');
buf.putOctet ('1');
+ buf.putOctet (opcode);
+ buf.putOctet (cls);
+}
+
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+{
+ uint8_t h1 = buf.getOctet ();
+ uint8_t h2 = buf.getOctet ();
+ uint8_t h3 = buf.getOctet ();
+ uint8_t h4 = buf.getOctet ();
+
+ *opcode = buf.getOctet ();
+ *cls = buf.getOctet ();
+
+ return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
}
void ManagementAgent::SendBuffer (Buffer& buf,
@@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf,
AMQFrame header (in_place<AMQHeaderBody>());
AMQFrame content(in_place<AMQContentBody>());
- QPID_LOG (debug, "ManagementAgent::SendBuffer - key="
- << routingKey << " len=" << length);
-
content.castBody<AMQContentBody>()->decode(buf, length);
method.setEof (false);
@@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getSchemaNeeded ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('S'); // opcode = Schema Record
- msgBuffer.putOctet (0); // content-class = N/A
+ EncodeHeader (msgBuffer, 'S');
object->writeSchema (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('C'); // content-class = Configuration
+ EncodeHeader (msgBuffer, 'C', 'C');
object->writeConfig (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer);
- msgBuffer.putOctet ('C'); // opcode = Content Record
- msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ EncodeHeader (msgBuffer, 'C', 'I');
object->writeInstrumentation (msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
@@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
start = pos + 1;
string methodName = routingKey.substr (start, routingKey.length () - start);
- QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
- << className << ", method: " << methodName);
-
contentSize = msg.encodedContentSize ();
if (contentSize < 8 || contentSize > 65536)
return;
@@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
Buffer inBuffer (inMem, contentSize);
Buffer outBuffer (outMem, 4096);
uint32_t outLen;
+ uint8_t opcode, unused;
msg.encodeContent (inBuffer);
inBuffer.reset ();
- uint32_t methodId = inBuffer.getLong ();
- uint64_t objId = inBuffer.getLongLong ();
- string replyTo;
+ if (!CheckHeader (inBuffer, &opcode, &unused))
+ {
+ QPID_LOG (debug, " Invalid content header");
+ return;
+ }
+
+ if (opcode != 'M')
+ {
+ QPID_LOG (debug, " Unexpected opcode " << opcode);
+ return;
+ }
- inBuffer.getShortString (replyTo);
+ uint32_t methodId = inBuffer.getLong ();
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyToKey;
- QPID_LOG (debug, " len = " << contentSize << ", methodId = " <<
- methodId << ", objId = " << objId);
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo())
+ {
+ const framing::ReplyTo& rt = p->getReplyTo ();
+ replyToKey = rt.getRoutingKey ();
+ }
+ else
+ {
+ QPID_LOG (debug, " Reply-to missing");
+ return;
+ }
+ EncodeHeader (outBuffer, 'R');
outBuffer.putLong (methodId);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
@@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
outLen = 4096 - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyTo);
+ SendBuffer (outBuffer, outLen, dExchange, replyToKey);
free (inMem);
}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index a4f10632da..36ba1f0542 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -40,10 +40,13 @@ class ManagementAgent
public:
+ virtual ~ManagementAgent ();
+
typedef boost::shared_ptr<ManagementAgent> shared_ptr;
static void enableManagement (void);
static shared_ptr getAgent (void);
+ static void shutdown (void);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
@@ -61,7 +64,7 @@ class ManagementAgent
ManagementAgent& agent;
Periodic (ManagementAgent& agent, uint32_t seconds);
- ~Periodic () {}
+ virtual ~Periodic ();
void fire ();
};
@@ -77,7 +80,8 @@ class ManagementAgent
uint64_t nextObjectId;
void PeriodicProcessing (void);
- void EncodeHeader (qpid::framing::Buffer& buf);
+ void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0);
+ bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
void SendBuffer (qpid::framing::Buffer& buf,
uint32_t length,
broker::Exchange::shared_ptr exchange,
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
index 0fcd65b092..1a79482c9d 100644
--- a/cpp/src/qpid/management/ManagementExchange.h
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0);
- virtual std::string getType() const { return typeName; }
+ virtual std::string getType() const { return typeName; }
virtual bool bind (Queue::shared_ptr queue,
const string& routingKey,
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
index ff136c397d..a32055721d 100644
--- a/cpp/src/qpid/management/ManagementObject.h
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -46,12 +46,16 @@ class ManagementObject
Manageable* coreObject;
std::string className;
- static const uint8_t TYPE_U8 = 1;
- static const uint8_t TYPE_U16 = 2;
- static const uint8_t TYPE_U32 = 3;
- static const uint8_t TYPE_U64 = 4;
- static const uint8_t TYPE_SSTR = 6;
- static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_U8 = 1;
+ static const uint8_t TYPE_U16 = 2;
+ static const uint8_t TYPE_U32 = 3;
+ static const uint8_t TYPE_U64 = 4;
+ static const uint8_t TYPE_SSTR = 6;
+ static const uint8_t TYPE_LSTR = 7;
+ static const uint8_t TYPE_ABSTIME = 8;
+ static const uint8_t TYPE_DELTATIME = 9;
+ static const uint8_t TYPE_REF = 10;
+ static const uint8_t TYPE_BOOL = 11;
static const uint8_t ACCESS_RC = 1;
static const uint8_t ACCESS_RW = 2;