diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
| commit | b4a562164bfbf627c9bf9e802ea2baa33d12521a (patch) | |
| tree | e3e1d0ed46174cf61e15569659c97a3c93ac6b97 /cpp/src/qpid/management | |
| parent | 6128b62ed47c825dba3f7a36ccdb60b55044ea2e (diff) | |
| download | qpid-python-b4a562164bfbf627c9bf9e802ea2baa33d12521a.tar.gz | |
Patch QPID-680 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594364 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/Args.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ArgsBrokerEcho.h | 38 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Broker.cpp | 126 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Broker.h | 76 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Manageable.h | 67 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 325 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 81 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementExchange.h | 61 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 61 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.h | 125 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Queue.cpp | 189 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Queue.h | 184 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Vhost.cpp | 57 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Vhost.h | 65 |
15 files changed, 1571 insertions, 0 deletions
diff --git a/cpp/src/qpid/management/Args.h b/cpp/src/qpid/management/Args.h new file mode 100644 index 0000000000..75d0b4dd70 --- /dev/null +++ b/cpp/src/qpid/management/Args.h @@ -0,0 +1,39 @@ +#ifndef _Args_ +#define _Args_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + + +namespace qpid { +namespace management { + +class Args +{ + public: + + virtual ~Args (void) = 0; + +}; + +inline Args::~Args (void) {} + +}} + +#endif /*!_Args_*/ diff --git a/cpp/src/qpid/management/ArgsBrokerEcho.h b/cpp/src/qpid/management/ArgsBrokerEcho.h new file mode 100644 index 0000000000..ad9d7e0813 --- /dev/null +++ b/cpp/src/qpid/management/ArgsBrokerEcho.h @@ -0,0 +1,38 @@ +#ifndef _ArgsBrokerEcho_ +#define _ArgsBrokerEcho_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "Args.h" +#include <string> + +namespace qpid { +namespace management { + +class ArgsBrokerEcho : public Args +{ + public: + uint32_t io_sequence; + std::string io_body; +}; + +}} + +#endif /*!_ArgsBrokerEcho_*/ diff --git a/cpp/src/qpid/management/Broker.cpp b/cpp/src/qpid/management/Broker.cpp new file mode 100644 index 0000000000..8626654c43 --- /dev/null +++ b/cpp/src/qpid/management/Broker.cpp @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "qpid/broker/Broker.h" +#include "Broker.h" +#include "ArgsBrokerEcho.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Broker::schemaNeeded = true; + +Broker::Broker (Manageable* _core, const Options& _conf) : + ManagementObject (_core) +{ + broker::Broker::Options& conf = (broker::Broker::Options&) _conf; + + sysId = "sysId"; + port = conf.port; + workerThreads = conf.workerThreads; + maxConns = conf.maxConnections; + connBacklog = conf.connectionBacklog; + stagingThreshold = conf.stagingThreshold; + storeLib = conf.store; + asyncStore = conf.storeAsync; + mgmtPubInterval = conf.mgmtPubInterval; + initialDiskPageSize = 0; + initialPagesPerQueue = 0; + clusterName = ""; + version = PACKAGE_VERSION; +} + +Broker::~Broker () {} + +void Broker::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true); + schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true); + schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); + schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); + schemaItem (buf, TYPE_UINT16, "connBacklog", + "Connection backlog limit for listening socket", true); + schemaItem (buf, TYPE_UINT32, "stagingThreshold", + "Broker stages messages over this size to disk", true); + schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); + schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); + schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); + schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", + "Number of disk pages allocated for storage", true); + schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", + "Number of disk pages allocated per queue", true); + schemaItem (buf, TYPE_STRING, "clusterName", + "Name of cluster this server is a member of, zero-length for standalone server", true); + schemaItem (buf, TYPE_STRING, "version", "Running software version", true); + schemaListEnd (buf); +} + +void Broker::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLong (0); + buf.putShort (port); + buf.putShort (workerThreads); + buf.putShort (maxConns); + buf.putShort (connBacklog); + buf.putLong (stagingThreshold); + buf.putShortString (storeLib); + buf.putOctet (asyncStore ? 1 : 0); + buf.putShort (mgmtPubInterval); + buf.putLong (initialDiskPageSize); + buf.putLong (initialPagesPerQueue); + buf.putShortString (clusterName); + buf.putShortString (version); +} + +void Broker::doMethod (string methodName, + Buffer& inBuf, + Buffer& outBuf) +{ + if (methodName.compare ("echo") == 0) + { + ArgsBrokerEcho args; + uint32_t result; + + args.io_sequence = inBuf.getLong (); + inBuf.getLongString (args.io_body); + + result = coreObject->ManagementMethod (1, args); + + outBuf.putLong (result); + outBuf.putShortString ("OK"); + outBuf.putLong (args.io_sequence); + outBuf.putLongString (args.io_body); + } + else + { + outBuf.putLong (1); + outBuf.putShortString ("Unknown Method"); + } +} + diff --git a/cpp/src/qpid/management/Broker.h b/cpp/src/qpid/management/Broker.h new file mode 100644 index 0000000000..91fddd3724 --- /dev/null +++ b/cpp/src/qpid/management/Broker.h @@ -0,0 +1,76 @@ +#ifndef _ManagementBroker_ +#define _ManagementBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace management { + +class Broker : public ManagementObject +{ + public: + + typedef boost::shared_ptr<Broker> shared_ptr; + + Broker (Manageable* coreObject, const Options& conf); + ~Broker (void); + + private: + + static bool schemaNeeded; + + std::string sysId; + uint16_t port; + uint16_t workerThreads; + uint16_t maxConns; + uint16_t connBacklog; + uint32_t stagingThreshold; + std::string storeLib; + bool asyncStore; + uint16_t mgmtPubInterval; + uint32_t initialDiskPageSize; + uint32_t initialPagesPerQueue; + std::string clusterName; + std::string version; + + uint16_t getObjectType (void) { return OBJECT_BROKER; } + std::string getObjectName (void) { return "broker"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf); + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementBroker_*/ diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h new file mode 100644 index 0000000000..7c9b49be9a --- /dev/null +++ b/cpp/src/qpid/management/Manageable.h @@ -0,0 +1,67 @@ +#ifndef _Manageable_ +#define _Manageable_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "ManagementObject.h" +#include "Args.h" +#include "qpid/sys/Time.h" +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <map> + +namespace qpid { +namespace management { + +class Manageable +{ + public: + + virtual ~Manageable (void) = 0; + + // status_t is a type used to pass completion status from the method handler. + // + typedef uint32_t status_t; + + static const status_t STATUS_OK = 0; + static const status_t STATUS_UNKNOWN_OBJECT = 1; + static const status_t STATUS_UNKNOWN_METHOD = 2; + + // Every "Manageable" object must hold a reference to exactly one + // management object. This object is always of a class derived from + // the pure-virtual "ManagementObject". + // + // This accessor function returns a shared_ptr to the management object. + // + virtual ManagementObject::shared_ptr GetManagementObject (void) const = 0; + + // Every "Manageable" object must implement ManagementMethod. This + // function is called when a remote management client invokes a method + // on this object. The input and output arguments are specific to the + // method being called and must be down-cast to the appropriate sub class + // before use. + virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0; +}; + +inline Manageable::~Manageable (void) {} + +}} + +#endif /*!_Manageable_*/ diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp new file mode 100644 index 0000000000..b5b4da09b8 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include <qpid/framing/AMQFrame.h> +#include <list> + +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; + +ManagementAgent::shared_ptr ManagementAgent::agent; + +ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +{ + timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); + nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); +} + +ManagementAgent::shared_ptr ManagementAgent::getAgent (void) +{ + if (agent.get () == 0) + agent = shared_ptr (new ManagementAgent (10)); + + return agent; +} + +void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange, + Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementAgent::addObject (ManagementObject::shared_ptr object) +{ + uint64_t objectId = nextObjectId++; + + object->setObjectId (objectId); + managementObjects[objectId] = object; +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); + agent.PeriodicProcessing (); +} + +void ManagementAgent::clientAdded (void) +{ + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + object->setAllChanged (); + object->setSchemaNeeded (); + } +} + +void ManagementAgent::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 +#define THRESHOLD 16384 + char msgChars[BUFSIZE]; + Buffer msgBuffer (msgChars, BUFSIZE); + uint32_t contentSize; + std::list<uint64_t> deleteList; + + if (managementObjects.empty ()) + return; + + Message::shared_ptr msg (new Message ()); + + // Build the magic number for the management message. + msgBuffer.putOctet ('A'); + msgBuffer.putOctet ('M'); + msgBuffer.putOctet ('0'); + msgBuffer.putOctet ('1'); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + + if (object->getSchemaNeeded ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getConfigChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getInstChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; + } + + msgBuffer.putOctet ('X'); // End-of-message + msgBuffer.putOctet (0); + msgBuffer.putShort (0); + msgBuffer.putLong (8); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "qpid.management", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + mExchange->route (deliverable, "mgmt", 0); + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + { + managementObjects.erase (*iter); + } + deleteList.clear (); +} + +void ManagementAgent::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + size_t pos, start; + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + uint32_t contentSize; + + if (routingKey.compare (0, 7, "method.") != 0) + { + QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); + return; + } + + start = 7; + if (routingKey.length () == start) + { + QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); + return; + } + + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); + return; + } + + string packageName = routingKey.substr (start, pos - start); + + start = pos + 1; + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); + return; + } + + string className = routingKey.substr (start, pos - start); + + start = pos + 1; + string methodName = routingKey.substr (start, routingKey.length () - start); + + QPID_LOG (debug, "Dispatch package: " << packageName << ", class: " + << className << ", method: " << methodName); + + contentSize = msg.encodedContentSize (); + if (contentSize < 8 || contentSize > 65536) + return; + + char *inMem = new char[contentSize]; + char outMem[4096]; // TODO Fix This + Buffer inBuffer (inMem, contentSize); + Buffer outBuffer (outMem, 4096); + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + uint32_t methodId = inBuffer.getLong (); + uint64_t objId = inBuffer.getLongLong (); + string replyTo; + + inBuffer.getShortString (replyTo); + + QPID_LOG (debug, " len = " << contentSize << ", methodId = " << + methodId << ", objId = " << objId); + + outBuffer.putLong (methodId); + + ManagementObjectMap::iterator iter = managementObjects.find (objId); + if (iter == managementObjects.end ()) + { + outBuffer.putLong (2); + outBuffer.putShortString ("Invalid Object Id"); + } + else + { + iter->second->doMethod (methodName, inBuffer, outBuffer); + } + + Message::shared_ptr outMsg (new Message ()); + uint32_t msgSize = 4096 - outBuffer.available (); + outBuffer.reset (); + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "amq.direct", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(outBuffer, msgSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + outMsg->getFrames().append(method); + outMsg->getFrames().append(header); + + MessageProperties* props = outMsg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(msgSize); + outMsg->getFrames().append(content); + + DeliverableMessage outDeliverable (outMsg); + dExchange->route (outDeliverable, replyTo, 0); + + free (inMem); +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..e84c0478f3 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,81 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "ManagementObject.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace management { + +class ManagementAgent +{ + private: + + ManagementAgent (uint16_t interval); + + public: + + typedef boost::shared_ptr<ManagementAgent> shared_ptr; + + static shared_ptr getAgent (void); + + void setInterval (uint16_t _interval) { interval = _interval; } + void setExchange (broker::Exchange::shared_ptr mgmtExchange, + broker::Exchange::shared_ptr directExchange); + void addObject (ManagementObject::shared_ptr object); + void clientAdded (void); + void dispatchCommand (broker::Deliverable& msg, + const std::string& routingKey, + const qpid::framing::FieldTable* args); + + private: + + struct Periodic : public broker::TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + ~Periodic () {} + void fire (); + }; + + static shared_ptr agent; + ManagementObjectMap managementObjects; + broker::Timer timer; + broker::Exchange::shared_ptr mExchange; + broker::Exchange::shared_ptr dExchange; + uint16_t interval; + uint64_t nextObjectId; + + void PeriodicProcessing (void); +}; + +}} + + + +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp new file mode 100644 index 0000000000..f18b6fc402 --- /dev/null +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementExchange::ManagementExchange (const string& _name) : + Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const std::string& _name, + bool _durable, + const FieldTable& _args) : + Exchange (_name, _durable, _args), + TopicExchange(_name, _durable, _args) {} + + +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const FieldTable* args) +{ + bool result = TopicExchange::bind (queue, routingKey, args); + + // Notify the management agent that a new management client has bound to the + // exchange. + if (result) + managementAgent->clientAdded (); + + return result; +} + +void ManagementExchange::route (Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + // Intercept management commands + if (routingKey.length () > 7 && + routingKey.substr (0, 7).compare ("method.") == 0) + { + managementAgent->dispatchCommand (msg, routingKey, args); + return; + } + + TopicExchange::route (msg, routingKey, args); +} + +void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + + +ManagementExchange::~ManagementExchange() {} + +const std::string ManagementExchange::typeName("management"); + diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h new file mode 100644 index 0000000000..6ccdf47182 --- /dev/null +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementExchange_ +#define _ManagementExchange_ + +#include "qpid/broker/TopicExchange.h" +#include "ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementExchange : public virtual TopicExchange +{ + private: + management::ManagementAgent::shared_ptr managementAgent; + + public: + static const std::string typeName; + + ManagementExchange (const string& name); + ManagementExchange (const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); + + virtual std::string getType() const { return typeName; } + + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + virtual void route (Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent (management::ManagementAgent::shared_ptr agent); + + virtual ~ManagementExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp new file mode 100644 index 0000000000..24588b4edd --- /dev/null +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "ManagementObject.h" + +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; + +void ManagementObject::schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig, + bool isIndex) +{ + uint8_t flags = + (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); + + buf.putOctet (flags); + buf.putOctet (typeCode); + buf.putShortString (name); + buf.putShortString (description); +} + +void ManagementObject::schemaListBegin (Buffer& buf) +{ + schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true); +} + +void ManagementObject::schemaListEnd (Buffer& buf) +{ + buf.putOctet (FLAG_END); +} + +void ManagementObject::writeTimestamps (Buffer& buf) +{ + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); + buf.putLongLong (objectId); +} diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h new file mode 100644 index 0000000000..416a477796 --- /dev/null +++ b/cpp/src/qpid/management/ManagementObject.h @@ -0,0 +1,125 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "qpid/sys/Time.h" +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <map> + +namespace qpid { +namespace management { + +const uint16_t OBJECT_SYSTEM = 1; +const uint16_t OBJECT_BROKER = 2; +const uint16_t OBJECT_VHOST = 3; +const uint16_t OBJECT_QUEUE = 4; +const uint16_t OBJECT_EXCHANGE = 5; +const uint16_t OBJECT_BINDING = 6; +const uint16_t OBJECT_CLIENT = 7; +const uint16_t OBJECT_SESSION = 8; +const uint16_t OBJECT_DESTINATION = 9; +const uint16_t OBJECT_PRODUCER = 10; +const uint16_t OBJECT_CONSUMER = 11; + +class Manageable; + +class ManagementObject +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + uint64_t objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + + static const uint8_t TYPE_UINT8 = 1; + static const uint8_t TYPE_UINT16 = 2; + static const uint8_t TYPE_UINT32 = 3; + static const uint8_t TYPE_UINT64 = 4; + static const uint8_t TYPE_BOOL = 5; + static const uint8_t TYPE_STRING = 6; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; + + void schemaItem (qpid::framing::Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false, + bool isIndex = false); + void schemaListBegin (qpid::framing::Buffer& buf); + void schemaListEnd (qpid::framing::Buffer& buf); + void writeTimestamps (qpid::framing::Buffer& buf); + + public: + typedef boost::shared_ptr<ManagementObject> shared_ptr; + + ManagementObject (Manageable* _core) : + destroyTime(0), objectId (0), configChanged(true), + instChanged(true), deleted(false), coreObject(_core) + { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } + virtual ~ManagementObject () {} + + virtual uint16_t getObjectType (void) = 0; + virtual std::string getObjectName (void) = 0; + virtual void writeSchema (qpid::framing::Buffer& buf) = 0; + virtual void writeConfig (qpid::framing::Buffer& buf) = 0; + virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + virtual void setSchemaNeeded (void) = 0; + virtual void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf) = 0; + + void setObjectId (uint64_t oid) { objectId = oid; } + uint64_t getObjectId (void) { return objectId; } + inline bool getConfigChanged (void) { return configChanged; } + virtual bool getInstChanged (void) { return instChanged; } + inline void setAllChanged (void) + { + configChanged = true; + instChanged = true; + } + + inline void resourceDestroy (void) { + destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); + deleted = true; + } + bool isDeleted (void) { return deleted; } + +}; + + typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; + +}} + + + +#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/management/Queue.cpp b/cpp/src/qpid/management/Queue.cpp new file mode 100644 index 0000000000..3c82877ebd --- /dev/null +++ b/cpp/src/qpid/management/Queue.cpp @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "Manageable.h" +#include "Queue.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Queue::schemaNeeded = true; + +Queue::Queue (Manageable* _core, Manageable* _parent, + const std::string& _name, + bool _durable, bool _autoDelete) : + ManagementObject(_core), name(_name), + durable(_durable), autoDelete(_autoDelete) +{ + vhostRef = _parent->GetManagementObject ()->getObjectId (); + + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +Queue::~Queue () {} + +void Queue::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT64, "vhostRef", "Virtual Host Ref", true); + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + schemaListEnd (buf); +} + +void Queue::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLongLong (vhostRef); + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); +} + +void Queue::writeInstrumentation (Buffer& buf) +{ + instChanged = false; + + writeTimestamps (buf); + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; +} diff --git a/cpp/src/qpid/management/Queue.h b/cpp/src/qpid/management/Queue.h new file mode 100644 index 0000000000..3a7fdf5263 --- /dev/null +++ b/cpp/src/qpid/management/Queue.h @@ -0,0 +1,184 @@ +#ifndef _ManagementQueue_ +#define _ManagementQueue_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +namespace qpid { +namespace management { + +const uint32_t MSG_MASK_TX = 1; // Transactional message +const uint32_t MSG_MASK_PERSIST = 2; // Persistent message + +class Queue : public ManagementObject +{ + private: + + static bool schemaNeeded; + + uint64_t vhostRef; + std::string name; + bool durable; + bool autoDelete; + + uint64_t msgTotalEnqueues; // Total messages enqueued + uint64_t msgTotalDequeues; // Total messages dequeued + uint64_t msgTxEnqueues; // Transactional messages enqueued + uint64_t msgTxDequeues; // Transactional messages dequeued + uint64_t msgPersistEnqueues; // Persistent messages enqueued + uint64_t msgPersistDequeues; // Persistent messages dequeued + + uint32_t msgDepth; // Current size of queue in messages + uint32_t msgDepthLow; // Low-water queue size, this interval + uint32_t msgDepthHigh; // High-water queue size, this interval + + uint64_t byteTotalEnqueues; // Total messages enqueued + uint64_t byteTotalDequeues; // Total messages dequeued + uint64_t byteTxEnqueues; // Transactional messages enqueued + uint64_t byteTxDequeues; // Transactional messages dequeued + uint64_t bytePersistEnqueues; // Persistent messages enqueued + uint64_t bytePersistDequeues; // Persistent messages dequeued + + uint32_t byteDepth; // Current size of queue in bytes + uint32_t byteDepthLow; // Low-water mark this interval + uint32_t byteDepthHigh; // High-water mark this interval + + uint64_t enqueueTxStarts; // Total enqueue transactions started + uint64_t enqueueTxCommits; // Total enqueue transactions committed + uint64_t enqueueTxRejects; // Total enqueue transactions rejected + + uint32_t enqueueTxCount; // Current pending enqueue transactions + uint32_t enqueueTxCountLow; // Low water mark this interval + uint32_t enqueueTxCountHigh; // High water mark this interval + + uint64_t dequeueTxStarts; // Total dequeue transactions started + uint64_t dequeueTxCommits; // Total dequeue transactions committed + uint64_t dequeueTxRejects; // Total dequeue transactions rejected + + uint32_t dequeueTxCount; // Current pending dequeue transactions + uint32_t dequeueTxCountLow; // Low water mark this interval + uint32_t dequeueTxCountHigh; // High water mark this interval + + uint32_t consumers; // Current consumers on queue + uint32_t consumersLow; // Low water mark this interval + uint32_t consumersHigh; // High water mark this interval + + uint16_t getObjectType (void) { return OBJECT_QUEUE; } + std::string getObjectName (void) { return "queue"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} + + inline void adjustQueueHiLo (void){ + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; + } + + inline void adjustTxHiLo (void){ + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; + } + + inline void adjustConsumerHiLo (void){ + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; + } + + public: + + typedef boost::shared_ptr<Queue> shared_ptr; + + Queue (Manageable* coreObject, Manageable* parentObject, + const std::string& name, bool durable, bool autoDelete); + ~Queue (void); + + // The following mask contents are used to describe enqueued or dequeued + // messages when counting statistics. + + inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); + } + + inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); + } + + inline void incConsumers (void){ + consumers++; + adjustConsumerHiLo (); + } + + inline void decConsumers (void){ + consumers--; + adjustConsumerHiLo (); + } +}; + +}} + + + +#endif /*!_ManagementQueue_*/ diff --git a/cpp/src/qpid/management/Vhost.cpp b/cpp/src/qpid/management/Vhost.cpp new file mode 100644 index 0000000000..effcb1599c --- /dev/null +++ b/cpp/src/qpid/management/Vhost.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "Vhost.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Vhost::schemaNeeded = true; + +Vhost::Vhost (Manageable* _core, Manageable* _parent) : + ManagementObject (_core), name("/") +{ + brokerRef = _parent->GetManagementObject ()->getObjectId (); +} + +Vhost::~Vhost () {} + +void Vhost::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" , true); + schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true); + schemaListEnd (buf); +} + +void Vhost::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLongLong (brokerRef); + buf.putShortString (name); +} + diff --git a/cpp/src/qpid/management/Vhost.h b/cpp/src/qpid/management/Vhost.h new file mode 100644 index 0000000000..5fc5a2870b --- /dev/null +++ b/cpp/src/qpid/management/Vhost.h @@ -0,0 +1,65 @@ +#ifndef _ManagementVhost_ +#define _ManagementVhost_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "ManagementObject.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace management { + +class Vhost : public ManagementObject +{ + public: + + typedef boost::shared_ptr<Vhost> shared_ptr; + + Vhost (Manageable* coreObject, Manageable* parentObject); + ~Vhost (void); + + private: + + static bool schemaNeeded; + + uint64_t brokerRef; + std::string name; + + uint16_t getObjectType (void) { return OBJECT_VHOST; } + std::string getObjectName (void) { return "vhost"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementVhost_*/ |
