diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2008-01-02 15:56:20 +0000 |
| commit | 3fe6853a7029e48f693c0853e51af33be5c79aec (patch) | |
| tree | 6139a715591aabc91370350aa26f854639a2aa11 /cpp/src/qpid/management | |
| parent | 8bc0b992a0e67259a7d9c525bbbbbc32fbc60a20 (diff) | |
| download | qpid-python-3fe6853a7029e48f693c0853e51af33be5c79aec.tar.gz | |
patch-715 (tross)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608135 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 85 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 16 |
4 files changed, 79 insertions, 32 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 90da74404b..d3c5d7c266 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -41,6 +41,8 @@ ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } +ManagementAgent::~ManagementAgent () {} + void ManagementAgent::enableManagement (void) { enabled = 1; @@ -54,6 +56,16 @@ ManagementAgent::shared_ptr ManagementAgent::getAgent (void) return agent; } +void ManagementAgent::shutdown (void) +{ + if (agent.get () != 0) + { + agent->mExchange.reset (); + agent->dExchange.reset (); + agent.reset (); + } +} + void ManagementAgent::setExchange (broker::Exchange::shared_ptr _mexchange, broker::Exchange::shared_ptr _dexchange) { @@ -73,6 +85,8 @@ void ManagementAgent::addObject (ManagementObject::shared_ptr object) ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} +ManagementAgent::Periodic::~Periodic () {} + void ManagementAgent::Periodic::fire () { agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval))); @@ -93,12 +107,27 @@ void ManagementAgent::clientAdded (void) } } -void ManagementAgent::EncodeHeader (Buffer& buf) +void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls) { buf.putOctet ('A'); buf.putOctet ('M'); buf.putOctet ('0'); buf.putOctet ('1'); + buf.putOctet (opcode); + buf.putOctet (cls); +} + +bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls) +{ + uint8_t h1 = buf.getOctet (); + uint8_t h2 = buf.getOctet (); + uint8_t h3 = buf.getOctet (); + uint8_t h4 = buf.getOctet (); + + *opcode = buf.getOctet (); + *cls = buf.getOctet (); + + return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1'; } void ManagementAgent::SendBuffer (Buffer& buf, @@ -112,9 +141,6 @@ void ManagementAgent::SendBuffer (Buffer& buf, AMQFrame header (in_place<AMQHeaderBody>()); AMQFrame content(in_place<AMQContentBody>()); - QPID_LOG (debug, "ManagementAgent::SendBuffer - key=" - << routingKey << " len=" << length); - content.castBody<AMQContentBody>()->decode(buf, length); method.setEof (false); @@ -156,9 +182,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getSchemaNeeded ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A + EncodeHeader (msgBuffer, 'S'); object->writeSchema (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -170,9 +194,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getConfigChanged () || object->isDeleted ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration + EncodeHeader (msgBuffer, 'C', 'C'); object->writeConfig (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -184,9 +206,7 @@ void ManagementAgent::PeriodicProcessing (void) if (object->getInstChanged ()) { Buffer msgBuffer (msgChars, BUFSIZE); - EncodeHeader (msgBuffer); - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation + EncodeHeader (msgBuffer, 'C', 'I'); object->writeInstrumentation (msgBuffer); contentSize = BUFSIZE - msgBuffer.available (); @@ -251,9 +271,6 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, start = pos + 1; string methodName = routingKey.substr (start, routingKey.length () - start); - QPID_LOG (debug, "Dispatch package: " << packageName << ", class: " - << className << ", method: " << methodName); - contentSize = msg.encodedContentSize (); if (contentSize < 8 || contentSize > 65536) return; @@ -263,19 +280,41 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, Buffer inBuffer (inMem, contentSize); Buffer outBuffer (outMem, 4096); uint32_t outLen; + uint8_t opcode, unused; msg.encodeContent (inBuffer); inBuffer.reset (); - uint32_t methodId = inBuffer.getLong (); - uint64_t objId = inBuffer.getLongLong (); - string replyTo; + if (!CheckHeader (inBuffer, &opcode, &unused)) + { + QPID_LOG (debug, " Invalid content header"); + return; + } + + if (opcode != 'M') + { + QPID_LOG (debug, " Unexpected opcode " << opcode); + return; + } - inBuffer.getShortString (replyTo); + uint32_t methodId = inBuffer.getLong (); + uint64_t objId = inBuffer.getLongLong (); + string replyToKey; - QPID_LOG (debug, " len = " << contentSize << ", methodId = " << - methodId << ", objId = " << objId); + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get<framing::MessageProperties>(); + if (p && p->hasReplyTo()) + { + const framing::ReplyTo& rt = p->getReplyTo (); + replyToKey = rt.getRoutingKey (); + } + else + { + QPID_LOG (debug, " Reply-to missing"); + return; + } + EncodeHeader (outBuffer, 'R'); outBuffer.putLong (methodId); ManagementObjectMap::iterator iter = managementObjects.find (objId); @@ -291,7 +330,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable, outLen = 4096 - outBuffer.available (); outBuffer.reset (); - SendBuffer (outBuffer, outLen, dExchange, replyTo); + SendBuffer (outBuffer, outLen, dExchange, replyToKey); free (inMem); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index a4f10632da..36ba1f0542 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -40,10 +40,13 @@ class ManagementAgent public: + virtual ~ManagementAgent (); + typedef boost::shared_ptr<ManagementAgent> shared_ptr; static void enableManagement (void); static shared_ptr getAgent (void); + static void shutdown (void); void setInterval (uint16_t _interval) { interval = _interval; } void setExchange (broker::Exchange::shared_ptr mgmtExchange, @@ -61,7 +64,7 @@ class ManagementAgent ManagementAgent& agent; Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} + virtual ~Periodic (); void fire (); }; @@ -77,7 +80,8 @@ class ManagementAgent uint64_t nextObjectId; void PeriodicProcessing (void); - void EncodeHeader (qpid::framing::Buffer& buf); + void EncodeHeader (qpid::framing::Buffer& buf, uint8_t opcode, uint8_t cls = 0); + bool CheckHeader (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls); void SendBuffer (qpid::framing::Buffer& buf, uint32_t length, broker::Exchange::shared_ptr exchange, diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index 0fcd65b092..1a79482c9d 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -40,7 +40,7 @@ class ManagementExchange : public virtual TopicExchange const qpid::framing::FieldTable& _args, Manageable* _parent = 0); - virtual std::string getType() const { return typeName; } + virtual std::string getType() const { return typeName; } virtual bool bind (Queue::shared_ptr queue, const string& routingKey, diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index ff136c397d..a32055721d 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -46,12 +46,16 @@ class ManagementObject Manageable* coreObject; std::string className; - static const uint8_t TYPE_U8 = 1; - static const uint8_t TYPE_U16 = 2; - static const uint8_t TYPE_U32 = 3; - static const uint8_t TYPE_U64 = 4; - static const uint8_t TYPE_SSTR = 6; - static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_U8 = 1; + static const uint8_t TYPE_U16 = 2; + static const uint8_t TYPE_U32 = 3; + static const uint8_t TYPE_U64 = 4; + static const uint8_t TYPE_SSTR = 6; + static const uint8_t TYPE_LSTR = 7; + static const uint8_t TYPE_ABSTIME = 8; + static const uint8_t TYPE_DELTATIME = 9; + static const uint8_t TYPE_REF = 10; + static const uint8_t TYPE_BOOL = 11; static const uint8_t ACCESS_RC = 1; static const uint8_t ACCESS_RW = 2; |
