diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
| commit | b4a562164bfbf627c9bf9e802ea2baa33d12521a (patch) | |
| tree | e3e1d0ed46174cf61e15569659c97a3c93ac6b97 /cpp/src/qpid | |
| parent | 6128b62ed47c825dba3f7a36ccdb60b55044ea2e (diff) | |
| download | qpid-python-b4a562164bfbf627c9bf9e802ea2baa33d12521a.tar.gz | |
Patch QPID-680 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594364 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
38 files changed, 1775 insertions, 1364 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 68590d3331..d61100d255 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,7 +28,8 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" -#include "management/ManagementExchange.h" +#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ArgsBrokerEcho.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -50,6 +51,11 @@ using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::management::ArgsBrokerEcho; namespace qpid { namespace broker { @@ -89,7 +95,7 @@ Broker::Options::Options(const std::string& name) : ("store-async", optValue(storeAsync,"yes|no"), "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.") ("store-force", optValue(storeForce,"yes|no"), - "Force changing modes of store, will delete all existing data if mode is change. Be SHURE you want to do this") + "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this") ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), @@ -115,33 +121,23 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack) { if(conf.enableMgmt){ - managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); + managementAgent = ManagementAgent::getAgent (); + managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf)); + managementAgent->addObject (mgmtObject); - // Since there is currently no support for virtual hosts, a management object - // representing the implied single virtual host is added here. - mgmtVhostObject = ManagementObjectVhost::shared_ptr - (new ManagementObjectVhost (mgmtObject->getObjectId (), conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + // Since there is currently no support for virtual hosts, a placeholder object + // representing the implied single virtual host is added here to keep the + // management schema correct. + Vhost* vhost = new Vhost (this); + vhostObject = Vhost::shared_ptr (vhost); - queues.setManagementAgent (managementAgent); - queues.setManagementVhost (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + queues.setParent (vhost); } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); - exchanges.declare(qpid_management, ManagementExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get (qpid_management); - managementAgent->setExchange (mExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); - } - else - QPID_LOG(info, "Management not enabled"); - if(store.get()) { if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){ throw Exception( "Existing Journal in different mode, backup/move existing data \ @@ -158,6 +154,17 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_fanout, FanOutExchange::typeName); declareStandardExchange(amq_match, HeadersExchange::typeName); + if(conf.enableMgmt) { + QPID_LOG(info, "Management enabled"); + exchanges.declare(qpid_management, ManagementExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get (qpid_management); + Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + managementAgent->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + } + else + QPID_LOG(info, "Management not enabled"); + // Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); for (Plugin::Plugins::const_iterator i = plugins.begin(); @@ -236,5 +243,17 @@ void Broker::update(ChannelId channel, FrameHandler::Chains& chains) { channel, boost::ref(chains))); } +ManagementObject::shared_ptr Broker::GetManagementObject(void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/, + Args& /*_args*/) +{ + QPID_LOG (debug, "Broker::ManagementMethod"); + return Manageable::STATUS_OK; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 6f4cc97936..0980c970d2 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,9 +30,10 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" -#include "management/ManagementAgent.h" -#include "management/ManagementObjectBroker.h" -#include "management/ManagementObjectVhost.h" +#include "Vhost.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/Broker.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -55,7 +56,7 @@ namespace broker { /** * A broker instance. */ -class Broker : public sys::Runnable, public Plugin::Target +class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable { public: struct Options : public qpid::Options { @@ -114,7 +115,10 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager& getDtxManager() { return dtxManager; } SessionManager& getSessionManager() { return sessionManager; } - ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; } + + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); private: sys::Acceptor& getAcceptor() const; @@ -131,9 +135,9 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager dtxManager; HandlerUpdaters handlerUpdaters; SessionManager sessionManager; - ManagementAgent::shared_ptr managementAgent; - ManagementObjectBroker::shared_ptr mgmtObject; - ManagementObjectVhost::shared_ptr mgmtVhostObject; + management::ManagementAgent::shared_ptr managementAgent; + management::Broker::shared_ptr mgmtObject; + Vhost::shared_ptr vhostObject; static MessageStore* createStore(const Options& config); void declareStandardExchange(const std::string& name, const std::string& type); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 35660cfa0b..2f58e23c23 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -23,7 +23,7 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" -#include "management/ManagementExchange.h" +#include "qpid/management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 0065ed397c..fe412fd77b 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -89,6 +89,13 @@ void Message::encode(framing::Buffer& buffer) const frames.map_if(f2, TypeFilter(CONTENT_BODY)); } +void Message::encodeContent(framing::Buffer& buffer) const +{ + //encode the payload of each content frame + EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter(CONTENT_BODY)); +} + uint32_t Message::encodedSize() const { return encodedHeaderSize() + encodedContentSize(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 913cc759bc..7fe8628bf5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -91,6 +91,7 @@ public: uint32_t getRequiredCredit() const; void encode(framing::Buffer& buffer) const; + void encodeContent(framing::Buffer& buffer) const; /** * @returns the size of the buffer needed to encode this diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 5745c85331..919343b152 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -36,10 +36,15 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, - const ConnectionToken* const _owner) : + const ConnectionToken* const _owner, + Manageable* parent) : name(_name), autodelete(_autodelete), @@ -50,9 +55,21 @@ Queue::Queue(const string& _name, bool _autodelete, serializer(false), dispatchCallback(*this) { + if (parent != 0) + { + mgmtObject = management::Queue::shared_ptr + (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject); + } } -Queue::~Queue(){} +Queue::~Queue() +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Queue::notifyDurableIOComplete() { @@ -79,7 +96,7 @@ void Queue::deliver(Message::shared_ptr& msg){ mgmtObject->enqueue (msg->contentSize ()); }else { if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -92,7 +109,7 @@ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -102,10 +119,10 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ - uint32_t mask = MSG_MASK_TX; + uint32_t mask = management::MSG_MASK_TX; if (msg->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; push(msg); if (mgmtObject != 0) @@ -327,7 +344,7 @@ QueuedMessage Queue::dequeue(){ uint32_t mask = 0; if (msg.payload->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; mgmtObject->dequeue (msg.payload->contentSize (), mask); } @@ -571,3 +588,14 @@ void Queue::DispatchFunctor::operator()() if (sync) sync->completed(); } + +ManagementObject::shared_ptr Queue::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/, + Args& /*args*/) +{ + return Manageable::STATUS_OK; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index a1bbe275da..4439ecbcc1 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -35,7 +35,8 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" -#include "management/ManagementObjectQueue.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Queue.h" namespace qpid { namespace broker { @@ -59,7 +60,7 @@ namespace qpid { * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue { + class Queue : public PersistableQueue, public management::Manageable { typedef std::vector<Consumer::ptr> Consumers; typedef std::deque<QueuedMessage> Messages; @@ -94,7 +95,7 @@ namespace qpid { qpid::sys::Serializer<DispatchFunctor> serializer; DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; - ManagementObjectQueue::shared_ptr mgmtObject; + management::Queue::shared_ptr mgmtObject; void pop(); void push(Message::shared_ptr& msg); @@ -122,7 +123,8 @@ namespace qpid { Queue(const string& name, bool autodelete = false, MessageStore* const store = 0, - const ConnectionToken* const owner = 0); + const ConnectionToken* const owner = 0, + Manageable* parent = 0); ~Queue(); void create(const qpid::framing::FieldTable& settings); @@ -130,8 +132,6 @@ namespace qpid { void destroy(); void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); - void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; } - ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; } bool acquire(const QueuedMessage& msg); @@ -203,6 +203,11 @@ namespace qpid { static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); static void tryAutoDelete(Broker& broker, Queue::shared_ptr); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); }; } } diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index bc7f00c7ef..f246c653ea 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,8 +19,6 @@ * */ #include "QueueRegistry.h" -#include "management/ManagementAgent.h" -#include "management/ManagementObjectQueue.h" #include "qpid/log/Statement.h" #include <sstream> #include <assert.h> @@ -29,7 +27,7 @@ using namespace qpid::broker; using namespace qpid::sys; QueueRegistry::QueueRegistry(MessageStore* const _store) : - counter(1), store(_store) {} + counter(1), store(_store), parent(0) {} QueueRegistry::~QueueRegistry(){} @@ -43,17 +41,9 @@ QueueRegistry::declare(const string& declareName, bool durable, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); queues[name] = queue; - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject - (new ManagementObjectQueue (managementVhost->getObjectId (), name, durable, autoDelete)); - - queue->setMgmt (mgmtObject); - managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject)); - } - return std::pair<Queue::shared_ptr, bool>(queue, true); } else { return std::pair<Queue::shared_ptr, bool>(i->second, false); @@ -61,16 +51,6 @@ QueueRegistry::declare(const string& declareName, bool durable, } void QueueRegistry::destroyLH (const string& name){ - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject; - QueueMap::iterator i = queues.find(name); - - if (i != queues.end()){ - mgmtObject = i->second->getMgmt (); - mgmtObject->resourceDestroy (); - } - } - queues.erase(name); } @@ -105,18 +85,3 @@ string QueueRegistry::generateName(){ MessageStore* const QueueRegistry::getStore() const { return store; } - -void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent) -{ - managementAgent = agent; -} - -ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void) -{ - return managementAgent; -} - -void QueueRegistry::setManagementVhost (ManagementObject::shared_ptr vhost) -{ - managementVhost = vhost; -} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 6fc90a5527..653a27b1d8 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -24,7 +24,7 @@ #include <map> #include "qpid/sys/Mutex.h" #include "Queue.h" -#include "management/ManagementAgent.h" +#include "qpid/management/Manageable.h" namespace qpid { namespace broker { @@ -89,22 +89,19 @@ class QueueRegistry{ * Return the message store used. */ MessageStore* const getStore() const; - + /** - * Set/Get the ManagementAgent in use. + * Register the manageable parent for declared queues */ - void setManagementAgent (ManagementAgent::shared_ptr agent); - ManagementAgent::shared_ptr getManagementAgent (void); - void setManagementVhost (ManagementObject::shared_ptr vhost); - + void setParent (management::Manageable* _parent) { parent = _parent; } + private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; qpid::sys::RWlock lock; int counter; MessageStore* const store; - ManagementAgent::shared_ptr managementAgent; - ManagementObject::shared_ptr managementVhost; + management::Manageable* parent; }; diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp new file mode 100644 index 0000000000..bf0521904f --- /dev/null +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -0,0 +1,37 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "Vhost.h" +#include "qpid/management/ManagementAgent.h" + +using namespace qpid::broker; +using qpid::management::ManagementAgent; + +Vhost::Vhost (management::Manageable* parentBroker) +{ + if (parentBroker != 0) + { + mgmtObject = management::Vhost::shared_ptr + (new management::Vhost (this, parentBroker)); + + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject); + } +} + diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h new file mode 100644 index 0000000000..b702dcebf0 --- /dev/null +++ b/cpp/src/qpid/broker/Vhost.h @@ -0,0 +1,51 @@ +#ifndef _Vhost_ +#define _Vhost_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/Vhost.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class Vhost : public management::Manageable +{ + private: + + management::Vhost::shared_ptr mgmtObject; + + public: + + typedef boost::shared_ptr<Vhost> shared_ptr; + + Vhost (management::Manageable* parentBroker); + + management::ManagementObject::shared_ptr GetManagementObject (void) const + { return mgmtObject; } + + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) + { return management::Manageable::STATUS_OK; } +}; + +}} + +#endif /*!_Vhost_*/ diff --git a/cpp/src/qpid/broker/management/ManagementAgent.cpp b/cpp/src/qpid/broker/management/ManagementAgent.cpp index c52e330ff2..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementAgent.cpp +++ b/cpp/src/qpid/broker/management/ManagementAgent.cpp @@ -1,234 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementAgent.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> -#include <qpid/broker/MessageDelivery.h> -#include <qpid/framing/AMQFrame.h> -#include <list> - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval), nextObjectId(1) -{ - timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); -} - -void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) -{ - exchange = _exchange; -} - -void ManagementAgent::addObject (ManagementObject::shared_ptr object) -{ - object->setObjectId (nextObjectId++); - managementObjects.push_back (object); - QPID_LOG(info, "Management Object Added"); -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); - agent.PeriodicProcessing (); -} - -void ManagementAgent::clientAdded (void) -{ - for (ManagementObjectVector::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = *iter; - object->setAllChanged (); - object->setSchemaNeeded (); - } -} - -void ManagementAgent::PeriodicProcessing (void) -{ -#define BUFSIZE 65536 -#define THRESHOLD 16384 - char msgChars[BUFSIZE]; - Buffer msgBuffer (msgChars, BUFSIZE); - uint32_t contentSize; - std::list<uint32_t> deleteList; - - if (managementObjects.empty ()) - return; - - Message::shared_ptr msg (new Message ()); - - // Build the magic number for the management message. - msgBuffer.putOctet ('A'); - msgBuffer.putOctet ('M'); - msgBuffer.putOctet ('0'); - msgBuffer.putOctet ('1'); - - for (uint32_t idx = 0; idx < managementObjects.size (); idx++) - { - ManagementObject::shared_ptr object = managementObjects[idx]; - - if (object->getSchemaNeeded ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeSchema (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getConfigChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeConfig (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getInstChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeInstrumentation (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->isDeleted ()) - deleteList.push_back (idx); - - // Temporary protection against buffer overrun. - // This needs to be replaced with frame fragmentation. - if (msgBuffer.available () < THRESHOLD) - break; - } - - msgBuffer.putOctet ('X'); // End-of-message - msgBuffer.putOctet (0); - msgBuffer.putShort (0); - msgBuffer.putLong (8); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - - AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "qpid.management", 0, 0)); - AMQFrame header (0, AMQHeaderBody()); - AMQFrame content; - - content.setBody(AMQContentBody()); - content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - exchange->route (deliverable, "mgmt", 0); - - // Delete flagged objects - for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - { - managementObjects.erase (managementObjects.begin () + *iter); - } - deleteList.clear (); -} - -void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, - const string& routingKey, - const FieldTable* /*args*/) -{ - size_t pos, start; - - if (routingKey.compare (0, 7, "method.") != 0) - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } - - start = 7; - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing class-key in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-key in routing key: " << routingKey); - return; - } - - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - - string methodName = routingKey.substr (start, routingKey.length () - start); - - QPID_LOG (debug, "Dispatch class: " << className << ", method: " << methodName); -} - diff --git a/cpp/src/qpid/broker/management/ManagementAgent.h b/cpp/src/qpid/broker/management/ManagementAgent.h index e699228601..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementAgent.h +++ b/cpp/src/qpid/broker/management/ManagementAgent.h @@ -1,74 +0,0 @@ -#ifndef _ManagementAgent_ -#define _ManagementAgent_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "ManagementObject.h" -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace broker { - - -class ManagementAgent -{ - public: - - typedef boost::shared_ptr<ManagementAgent> shared_ptr; - - ManagementAgent (uint16_t interval); - - void setExchange (Exchange::shared_ptr exchange); - void addObject (ManagementObject::shared_ptr object); - void clientAdded (void); - void dispatchCommand (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - private: - - struct Periodic : public TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} - void fire (); - }; - - ManagementObjectVector managementObjects; - Timer timer; - Exchange::shared_ptr exchange; - uint16_t interval; - uint32_t nextObjectId; - - void PeriodicProcessing (void); -}; - -}} - - - -#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/broker/management/ManagementExchange.cpp b/cpp/src/qpid/broker/management/ManagementExchange.cpp index 5d829477ba..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementExchange.cpp +++ b/cpp/src/qpid/broker/management/ManagementExchange.cpp @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementExchange::ManagementExchange (const string& _name) : - Exchange (_name), TopicExchange(_name) {} -ManagementExchange::ManagementExchange (const std::string& _name, - bool _durable, - const FieldTable& _args) : - Exchange (_name, _durable, _args), - TopicExchange(_name, _durable, _args) {} - - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const FieldTable* args) -{ - bool result = TopicExchange::bind (queue, routingKey, args); - - // Notify the management agent that a new management client has bound to the - // exchange. - if (result) - managementAgent->clientAdded (); - - return result; -} - -void ManagementExchange::route (Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - // Intercept management commands - if (routingKey.length () > 7 && - routingKey.substr (0, 7).compare ("method.") == 0) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } - - TopicExchange::route (msg, routingKey, args); -} - -void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) -{ - managementAgent = agent; -} - - -ManagementExchange::~ManagementExchange() {} - -const std::string ManagementExchange::typeName("management"); - diff --git a/cpp/src/qpid/broker/management/ManagementExchange.h b/cpp/src/qpid/broker/management/ManagementExchange.h index c38f38d0a1..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementExchange.h +++ b/cpp/src/qpid/broker/management/ManagementExchange.h @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ManagementExchange_ -#define _ManagementExchange_ - -#include "qpid/broker/TopicExchange.h" -#include "ManagementAgent.h" - -namespace qpid { -namespace broker { - -class ManagementExchange : public virtual TopicExchange -{ - private: - ManagementAgent::shared_ptr managementAgent; - - public: - static const std::string typeName; - - ManagementExchange (const string& name); - ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); - - virtual std::string getType() const { return typeName; } - - virtual bool bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args); - - virtual void route (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - void setManagmentAgent (ManagementAgent::shared_ptr agent); - - virtual ~ManagementExchange(); -}; - - -} -} - -#endif diff --git a/cpp/src/qpid/broker/management/ManagementObject.cpp b/cpp/src/qpid/broker/management/ManagementObject.cpp index 1c693d6b92..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObject.cpp +++ b/cpp/src/qpid/broker/management/ManagementObject.cpp @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -void ManagementObject::schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig, - bool isIndex) -{ - uint8_t flags = - (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); - - buf.putOctet (flags); - buf.putOctet (typeCode); - buf.putShortString (name); - buf.putShortString (description); -} - -void ManagementObject::schemaListBegin (Buffer& buf) -{ - schemaItem (buf, TYPE_UINT32, "id", "Object ID", true, true); -} - -void ManagementObject::schemaListEnd (Buffer& buf) -{ - buf.putOctet (FLAG_END); -} - -void ManagementObject::writeTimestamps (Buffer& buf) -{ - buf.putLongLong (uint64_t (Duration (now ()))); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - buf.putLong (objectId); -} diff --git a/cpp/src/qpid/broker/management/ManagementObject.h b/cpp/src/qpid/broker/management/ManagementObject.h index a39ae80c4f..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObject.h +++ b/cpp/src/qpid/broker/management/ManagementObject.h @@ -1,118 +0,0 @@ -#ifndef _ManagementObject_ -#define _ManagementObject_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include <qpid/framing/Buffer.h> -#include <boost/shared_ptr.hpp> -#include <vector> - -namespace qpid { -namespace broker { - -const uint16_t OBJECT_SYSTEM = 1; -const uint16_t OBJECT_BROKER = 2; -const uint16_t OBJECT_VHOST = 3; -const uint16_t OBJECT_QUEUE = 4; -const uint16_t OBJECT_EXCHANGE = 5; -const uint16_t OBJECT_BINDING = 6; -const uint16_t OBJECT_CLIENT = 7; -const uint16_t OBJECT_SESSION = 8; -const uint16_t OBJECT_DESTINATION = 9; -const uint16_t OBJECT_PRODUCER = 10; -const uint16_t OBJECT_CONSUMER = 11; - - -class ManagementObject -{ - protected: - - uint64_t createTime; - uint64_t destroyTime; - uint32_t objectId; - bool configChanged; - bool instChanged; - bool deleted; - - static const uint8_t TYPE_UINT8 = 1; - static const uint8_t TYPE_UINT16 = 2; - static const uint8_t TYPE_UINT32 = 3; - static const uint8_t TYPE_UINT64 = 4; - static const uint8_t TYPE_BOOL = 5; - static const uint8_t TYPE_STRING = 6; - - static const uint8_t FLAG_CONFIG = 0x01; - static const uint8_t FLAG_INDEX = 0x02; - static const uint8_t FLAG_END = 0x80; - - void schemaItem (qpid::framing::Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig = false, - bool isIndex = false); - void schemaListBegin (qpid::framing::Buffer& buf); - void schemaListEnd (qpid::framing::Buffer& buf); - void writeTimestamps (qpid::framing::Buffer& buf); - - public: - typedef boost::shared_ptr<ManagementObject> shared_ptr; - - ManagementObject () : destroyTime(0), objectId (), configChanged(true), - instChanged(true), deleted(false) - { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } - virtual ~ManagementObject () {} - - virtual uint16_t getObjectType (void) = 0; - virtual std::string getObjectName (void) = 0; - virtual void writeSchema (qpid::framing::Buffer& buf) = 0; - virtual void writeConfig (qpid::framing::Buffer& buf) = 0; - virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; - - void setObjectId (uint32_t oid) { objectId = oid; } - uint32_t getObjectId (void) { return objectId; } - inline bool getConfigChanged (void) { return configChanged; } - virtual bool getInstChanged (void) { return instChanged; } - inline void setAllChanged (void) - { - configChanged = true; - instChanged = true; - } - - inline void resourceDestroy (void) { - destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); - deleted = true; - } - bool isDeleted (void) { return deleted; } - -}; - - typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; - -}} - - - -#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp index 139ea0acca..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "config.h" -#include "qpid/broker/Broker.h" -#include "ManagementObjectBroker.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectBroker::schemaNeeded = true; - -ManagementObjectBroker::ManagementObjectBroker (const Options& _conf) -{ - Broker::Options& conf = (Broker::Options&) _conf; - - sysId = "sysId"; - port = conf.port; - workerThreads = conf.workerThreads; - maxConns = conf.maxConnections; - connBacklog = conf.connectionBacklog; - stagingThreshold = conf.stagingThreshold; - storeLib = conf.store; - asyncStore = conf.storeAsync; - mgmtPubInterval = conf.mgmtPubInterval; - initialDiskPageSize = 0; - initialPagesPerQueue = 0; - clusterName = ""; - version = PACKAGE_VERSION; -} - -ManagementObjectBroker::~ManagementObjectBroker () {} - -void ManagementObjectBroker::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true); - schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true); - schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); - schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); - schemaItem (buf, TYPE_UINT16, "connBacklog", - "Connection backlog limit for listening socket", true); - schemaItem (buf, TYPE_UINT32, "stagingThreshold", - "Broker stages messages over this size to disk", true); - schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); - schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); - schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); - schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", - "Number of disk pages allocated for storage", true); - schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", - "Number of disk pages allocated per queue", true); - schemaItem (buf, TYPE_STRING, "clusterName", - "Name of cluster this server is a member of, zero-length for standalone server", true); - schemaItem (buf, TYPE_STRING, "version", "Running software version", true); - schemaListEnd (buf); -} - -void ManagementObjectBroker::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (0); - buf.putShort (port); - buf.putShort (workerThreads); - buf.putShort (maxConns); - buf.putShort (connBacklog); - buf.putLong (stagingThreshold); - buf.putShortString (storeLib); - buf.putOctet (asyncStore ? 1 : 0); - buf.putShort (mgmtPubInterval); - buf.putLong (initialDiskPageSize); - buf.putLong (initialPagesPerQueue); - buf.putShortString (clusterName); - buf.putShortString (version); -} - diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.h b/cpp/src/qpid/broker/management/ManagementObjectBroker.h index 25d6bf49b0..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectBroker.h +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.h @@ -1,73 +0,0 @@ -#ifndef _ManagementObjectBroker_ -#define _ManagementObjectBroker_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" -#include "qpid/Options.h" -#include "boost/shared_ptr.hpp" - -namespace qpid { -namespace broker { - -class ManagementObjectBroker : public ManagementObject -{ - public: - - typedef boost::shared_ptr<ManagementObjectBroker> shared_ptr; - - ManagementObjectBroker (const Options& conf); - ~ManagementObjectBroker (void); - - private: - - static bool schemaNeeded; - - std::string sysId; - uint16_t port; - uint16_t workerThreads; - uint16_t maxConns; - uint16_t connBacklog; - uint32_t stagingThreshold; - std::string storeLib; - bool asyncStore; - uint16_t mgmtPubInterval; - uint32_t initialDiskPageSize; - uint32_t initialPagesPerQueue; - std::string clusterName; - std::string version; - - uint16_t getObjectType (void) { return OBJECT_BROKER; } - std::string getObjectName (void) { return "broker"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline bool getInstChanged (void) { return false; } -}; - -}} - - -#endif /*!_ManagementObjectBroker_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp index 8d8ff8cbf4..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObjectQueue.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectQueue::schemaNeeded = true; - -ManagementObjectQueue::ManagementObjectQueue (uint32_t _vhostRef, std::string& _name, - bool _durable, bool _autoDelete) : - vhostRef(_vhostRef), name(_name), durable(_durable), autoDelete(_autoDelete) -{ - msgTotalEnqueues = 0; - msgTotalDequeues = 0; - msgTxEnqueues = 0; - msgTxDequeues = 0; - msgPersistEnqueues = 0; - msgPersistDequeues = 0; - - msgDepth = 0; - msgDepthLow = 0; - msgDepthHigh = 0; - - byteTotalEnqueues = 0; - byteTotalDequeues = 0; - byteTxEnqueues = 0; - byteTxDequeues = 0; - bytePersistEnqueues = 0; - bytePersistDequeues = 0; - - byteDepth = 0; - byteDepthLow = 0; - byteDepthHigh = 0; - - enqueueTxStarts = 0; - enqueueTxCommits = 0; - enqueueTxRejects = 0; - dequeueTxStarts = 0; - dequeueTxCommits = 0; - dequeueTxRejects = 0; - - enqueueTxCount = 0; - enqueueTxCountLow = 0; - enqueueTxCountHigh = 0; - - dequeueTxCount = 0; - dequeueTxCountLow = 0; - dequeueTxCountHigh = 0; - - consumers = 0; - consumersLow = 0; - consumersHigh = 0; -} - -ManagementObjectQueue::~ManagementObjectQueue () {} - -void ManagementObjectQueue::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "vhostRef", "Virtual Host Ref", true); - schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); - schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); - schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); - schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); - schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); - schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); - schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); - schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); - schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started "); - schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started "); - schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); - schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); - schemaListEnd (buf); -} - -void ManagementObjectQueue::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (vhostRef); - buf.putShortString (name); - buf.putOctet (durable ? 1 : 0); - buf.putOctet (autoDelete ? 1 : 0); -} - -void ManagementObjectQueue::writeInstrumentation (Buffer& buf) -{ - instChanged = false; - - writeTimestamps (buf); - buf.putLongLong (msgTotalEnqueues); - buf.putLongLong (msgTotalDequeues); - buf.putLongLong (msgTxEnqueues); - buf.putLongLong (msgTxDequeues); - buf.putLongLong (msgPersistEnqueues); - buf.putLongLong (msgPersistDequeues); - buf.putLong (msgDepth); - buf.putLong (msgDepthLow); - buf.putLong (msgDepthHigh); - buf.putLongLong (byteTotalEnqueues); - buf.putLongLong (byteTotalDequeues); - buf.putLongLong (byteTxEnqueues); - buf.putLongLong (byteTxDequeues); - buf.putLongLong (bytePersistEnqueues); - buf.putLongLong (bytePersistDequeues); - buf.putLong (byteDepth); - buf.putLong (byteDepthLow); - buf.putLong (byteDepthHigh); - buf.putLongLong (enqueueTxStarts); - buf.putLongLong (enqueueTxCommits); - buf.putLongLong (enqueueTxRejects); - buf.putLong (enqueueTxCount); - buf.putLong (enqueueTxCountLow); - buf.putLong (enqueueTxCountHigh); - buf.putLongLong (dequeueTxStarts); - buf.putLongLong (dequeueTxCommits); - buf.putLongLong (dequeueTxRejects); - buf.putLong (dequeueTxCount); - buf.putLong (dequeueTxCountLow); - buf.putLong (dequeueTxCountHigh); - buf.putLong (consumers); - buf.putLong (consumersLow); - buf.putLong (consumersHigh); - - msgDepthLow = msgDepth; - msgDepthHigh = msgDepth; - byteDepthLow = byteDepth; - byteDepthHigh = byteDepth; - enqueueTxCountLow = enqueueTxCount; - enqueueTxCountHigh = enqueueTxCount; - dequeueTxCountLow = dequeueTxCount; - dequeueTxCountHigh = dequeueTxCount; - consumersLow = consumers; - consumersHigh = consumers; -} diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.h b/cpp/src/qpid/broker/management/ManagementObjectQueue.h index cedbf81809..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectQueue.h +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.h @@ -1,181 +0,0 @@ -#ifndef _ManagementObjectQueue_ -#define _ManagementObjectQueue_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -namespace qpid { -namespace broker { - -const uint32_t MSG_MASK_TX = 1; // Transactional message -const uint32_t MSG_MASK_PERSIST = 2; // Persistent message - -class ManagementObjectQueue : public ManagementObject -{ - private: - - static bool schemaNeeded; - - uint32_t vhostRef; - std::string name; - bool durable; - bool autoDelete; - - uint64_t msgTotalEnqueues; // Total messages enqueued - uint64_t msgTotalDequeues; // Total messages dequeued - uint64_t msgTxEnqueues; // Transactional messages enqueued - uint64_t msgTxDequeues; // Transactional messages dequeued - uint64_t msgPersistEnqueues; // Persistent messages enqueued - uint64_t msgPersistDequeues; // Persistent messages dequeued - - uint32_t msgDepth; // Current size of queue in messages - uint32_t msgDepthLow; // Low-water queue size, this interval - uint32_t msgDepthHigh; // High-water queue size, this interval - - uint64_t byteTotalEnqueues; // Total messages enqueued - uint64_t byteTotalDequeues; // Total messages dequeued - uint64_t byteTxEnqueues; // Transactional messages enqueued - uint64_t byteTxDequeues; // Transactional messages dequeued - uint64_t bytePersistEnqueues; // Persistent messages enqueued - uint64_t bytePersistDequeues; // Persistent messages dequeued - - uint32_t byteDepth; // Current size of queue in bytes - uint32_t byteDepthLow; // Low-water mark this interval - uint32_t byteDepthHigh; // High-water mark this interval - - uint64_t enqueueTxStarts; // Total enqueue transactions started - uint64_t enqueueTxCommits; // Total enqueue transactions committed - uint64_t enqueueTxRejects; // Total enqueue transactions rejected - - uint32_t enqueueTxCount; // Current pending enqueue transactions - uint32_t enqueueTxCountLow; // Low water mark this interval - uint32_t enqueueTxCountHigh; // High water mark this interval - - uint64_t dequeueTxStarts; // Total dequeue transactions started - uint64_t dequeueTxCommits; // Total dequeue transactions committed - uint64_t dequeueTxRejects; // Total dequeue transactions rejected - - uint32_t dequeueTxCount; // Current pending dequeue transactions - uint32_t dequeueTxCountLow; // Low water mark this interval - uint32_t dequeueTxCountHigh; // High water mark this interval - - uint32_t consumers; // Current consumers on queue - uint32_t consumersLow; // Low water mark this interval - uint32_t consumersHigh; // High water mark this interval - - uint16_t getObjectType (void) { return OBJECT_QUEUE; } - std::string getObjectName (void) { return "queue"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline void adjustQueueHiLo (void){ - if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; - if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; - - if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; - if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; - instChanged = true; - } - - inline void adjustTxHiLo (void){ - if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; - if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; - if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; - if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; - instChanged = true; - } - - inline void adjustConsumerHiLo (void){ - if (consumers > consumersHigh) consumersHigh = consumers; - if (consumers < consumersLow) consumersLow = consumers; - instChanged = true; - } - - public: - - typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr; - - ManagementObjectQueue (uint32_t _vhostRef, std::string& name, - bool durable, bool autoDelete); - ~ManagementObjectQueue (void); - - // The following mask contents are used to describe enqueued or dequeued - // messages when counting statistics. - - inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalEnqueues++; - byteTotalEnqueues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxEnqueues++; - byteTxEnqueues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistEnqueues++; - bytePersistEnqueues += bytes; - } - - msgDepth++; - byteDepth += bytes; - adjustQueueHiLo (); - } - - inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalDequeues++; - byteTotalDequeues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxDequeues++; - byteTxDequeues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistDequeues++; - bytePersistDequeues += bytes; - } - - msgDepth--; - byteDepth -= bytes; - adjustQueueHiLo (); - } - - inline void incConsumers (void){ - consumers++; - adjustConsumerHiLo (); - } - - inline void decConsumers (void){ - consumers--; - adjustConsumerHiLo (); - } -}; - -}} - - - -#endif /*!_ManagementObjectQueue_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp index b5c8b3b0b7..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Broker.h" -#include "ManagementObjectVhost.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectVhost::schemaNeeded = true; - -ManagementObjectVhost::ManagementObjectVhost (uint32_t _sysRef, const Options& /*_conf*/) : - sysRef(_sysRef), name("/") {} - -ManagementObjectVhost::~ManagementObjectVhost () {} - -void ManagementObjectVhost::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "brokerRef", "Broker Reference" , true); - schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true); - schemaListEnd (buf); -} - -void ManagementObjectVhost::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (sysRef); - buf.putShortString (name); -} - diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.h b/cpp/src/qpid/broker/management/ManagementObjectVhost.h index c36acb3487..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectVhost.h +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.h @@ -1,62 +0,0 @@ -#ifndef _ManagementObjectVhost_ -#define _ManagementObjectVhost_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" -#include "qpid/Options.h" -#include "boost/shared_ptr.hpp" - -namespace qpid { -namespace broker { - -class ManagementObjectVhost : public ManagementObject -{ - public: - - typedef boost::shared_ptr<ManagementObjectVhost> shared_ptr; - - ManagementObjectVhost (uint32_t sysRef, const Options& conf); - ~ManagementObjectVhost (void); - - private: - - static bool schemaNeeded; - - uint32_t sysRef; - std::string name; - - uint16_t getObjectType (void) { return OBJECT_VHOST; } - std::string getObjectName (void) { return "vhost"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline bool getInstChanged (void) { return false; } -}; - -}} - - -#endif /*!_ManagementObjectVhost_*/ diff --git a/cpp/src/qpid/management/Args.h b/cpp/src/qpid/management/Args.h new file mode 100644 index 0000000000..75d0b4dd70 --- /dev/null +++ b/cpp/src/qpid/management/Args.h @@ -0,0 +1,39 @@ +#ifndef _Args_ +#define _Args_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + + +namespace qpid { +namespace management { + +class Args +{ + public: + + virtual ~Args (void) = 0; + +}; + +inline Args::~Args (void) {} + +}} + +#endif /*!_Args_*/ diff --git a/cpp/src/qpid/management/ArgsBrokerEcho.h b/cpp/src/qpid/management/ArgsBrokerEcho.h new file mode 100644 index 0000000000..ad9d7e0813 --- /dev/null +++ b/cpp/src/qpid/management/ArgsBrokerEcho.h @@ -0,0 +1,38 @@ +#ifndef _ArgsBrokerEcho_ +#define _ArgsBrokerEcho_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "Args.h" +#include <string> + +namespace qpid { +namespace management { + +class ArgsBrokerEcho : public Args +{ + public: + uint32_t io_sequence; + std::string io_body; +}; + +}} + +#endif /*!_ArgsBrokerEcho_*/ diff --git a/cpp/src/qpid/management/Broker.cpp b/cpp/src/qpid/management/Broker.cpp new file mode 100644 index 0000000000..8626654c43 --- /dev/null +++ b/cpp/src/qpid/management/Broker.cpp @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "qpid/broker/Broker.h" +#include "Broker.h" +#include "ArgsBrokerEcho.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Broker::schemaNeeded = true; + +Broker::Broker (Manageable* _core, const Options& _conf) : + ManagementObject (_core) +{ + broker::Broker::Options& conf = (broker::Broker::Options&) _conf; + + sysId = "sysId"; + port = conf.port; + workerThreads = conf.workerThreads; + maxConns = conf.maxConnections; + connBacklog = conf.connectionBacklog; + stagingThreshold = conf.stagingThreshold; + storeLib = conf.store; + asyncStore = conf.storeAsync; + mgmtPubInterval = conf.mgmtPubInterval; + initialDiskPageSize = 0; + initialPagesPerQueue = 0; + clusterName = ""; + version = PACKAGE_VERSION; +} + +Broker::~Broker () {} + +void Broker::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true); + schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true); + schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); + schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); + schemaItem (buf, TYPE_UINT16, "connBacklog", + "Connection backlog limit for listening socket", true); + schemaItem (buf, TYPE_UINT32, "stagingThreshold", + "Broker stages messages over this size to disk", true); + schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); + schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); + schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); + schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", + "Number of disk pages allocated for storage", true); + schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", + "Number of disk pages allocated per queue", true); + schemaItem (buf, TYPE_STRING, "clusterName", + "Name of cluster this server is a member of, zero-length for standalone server", true); + schemaItem (buf, TYPE_STRING, "version", "Running software version", true); + schemaListEnd (buf); +} + +void Broker::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLong (0); + buf.putShort (port); + buf.putShort (workerThreads); + buf.putShort (maxConns); + buf.putShort (connBacklog); + buf.putLong (stagingThreshold); + buf.putShortString (storeLib); + buf.putOctet (asyncStore ? 1 : 0); + buf.putShort (mgmtPubInterval); + buf.putLong (initialDiskPageSize); + buf.putLong (initialPagesPerQueue); + buf.putShortString (clusterName); + buf.putShortString (version); +} + +void Broker::doMethod (string methodName, + Buffer& inBuf, + Buffer& outBuf) +{ + if (methodName.compare ("echo") == 0) + { + ArgsBrokerEcho args; + uint32_t result; + + args.io_sequence = inBuf.getLong (); + inBuf.getLongString (args.io_body); + + result = coreObject->ManagementMethod (1, args); + + outBuf.putLong (result); + outBuf.putShortString ("OK"); + outBuf.putLong (args.io_sequence); + outBuf.putLongString (args.io_body); + } + else + { + outBuf.putLong (1); + outBuf.putShortString ("Unknown Method"); + } +} + diff --git a/cpp/src/qpid/management/Broker.h b/cpp/src/qpid/management/Broker.h new file mode 100644 index 0000000000..91fddd3724 --- /dev/null +++ b/cpp/src/qpid/management/Broker.h @@ -0,0 +1,76 @@ +#ifndef _ManagementBroker_ +#define _ManagementBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace management { + +class Broker : public ManagementObject +{ + public: + + typedef boost::shared_ptr<Broker> shared_ptr; + + Broker (Manageable* coreObject, const Options& conf); + ~Broker (void); + + private: + + static bool schemaNeeded; + + std::string sysId; + uint16_t port; + uint16_t workerThreads; + uint16_t maxConns; + uint16_t connBacklog; + uint32_t stagingThreshold; + std::string storeLib; + bool asyncStore; + uint16_t mgmtPubInterval; + uint32_t initialDiskPageSize; + uint32_t initialPagesPerQueue; + std::string clusterName; + std::string version; + + uint16_t getObjectType (void) { return OBJECT_BROKER; } + std::string getObjectName (void) { return "broker"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf); + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementBroker_*/ diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h new file mode 100644 index 0000000000..7c9b49be9a --- /dev/null +++ b/cpp/src/qpid/management/Manageable.h @@ -0,0 +1,67 @@ +#ifndef _Manageable_ +#define _Manageable_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "ManagementObject.h" +#include "Args.h" +#include "qpid/sys/Time.h" +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <map> + +namespace qpid { +namespace management { + +class Manageable +{ + public: + + virtual ~Manageable (void) = 0; + + // status_t is a type used to pass completion status from the method handler. + // + typedef uint32_t status_t; + + static const status_t STATUS_OK = 0; + static const status_t STATUS_UNKNOWN_OBJECT = 1; + static const status_t STATUS_UNKNOWN_METHOD = 2; + + // Every "Manageable" object must hold a reference to exactly one + // management object. This object is always of a class derived from + // the pure-virtual "ManagementObject". + // + // This accessor function returns a shared_ptr to the management object. + // + virtual ManagementObject::shared_ptr GetManagementObject (void) const = 0; + + // Every "Manageable" object must implement ManagementMethod. This + // function is called when a remote management client invokes a method + // on this object. The input and output arguments are specific to the + // method being called and must be down-cast to the appropriate sub class + // before use. + virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0; +}; + +inline Manageable::~Manageable (void) {} + +}} + +#endif /*!_Manageable_*/ diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp new file mode 100644 index 0000000000..b5b4da09b8 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include <qpid/broker/Message.h> +#include <qpid/broker/MessageDelivery.h> +#include <qpid/framing/AMQFrame.h> +#include <list> + +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; + +ManagementAgent::shared_ptr ManagementAgent::agent; + +ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +{ + timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); + nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); +} + +ManagementAgent::shared_ptr ManagementAgent::getAgent (void) +{ + if (agent.get () == 0) + agent = shared_ptr (new ManagementAgent (10)); + + return agent; +} + +void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange, + Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementAgent::addObject (ManagementObject::shared_ptr object) +{ + uint64_t objectId = nextObjectId++; + + object->setObjectId (objectId); + managementObjects[objectId] = object; +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); + agent.PeriodicProcessing (); +} + +void ManagementAgent::clientAdded (void) +{ + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + object->setAllChanged (); + object->setSchemaNeeded (); + } +} + +void ManagementAgent::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 +#define THRESHOLD 16384 + char msgChars[BUFSIZE]; + Buffer msgBuffer (msgChars, BUFSIZE); + uint32_t contentSize; + std::list<uint64_t> deleteList; + + if (managementObjects.empty ()) + return; + + Message::shared_ptr msg (new Message ()); + + // Build the magic number for the management message. + msgBuffer.putOctet ('A'); + msgBuffer.putOctet ('M'); + msgBuffer.putOctet ('0'); + msgBuffer.putOctet ('1'); + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = iter->second; + + if (object->getSchemaNeeded ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getConfigChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getInstChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->isDeleted ()) + deleteList.push_back (iter->first); + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; + } + + msgBuffer.putOctet ('X'); // End-of-message + msgBuffer.putOctet (0); + msgBuffer.putShort (0); + msgBuffer.putLong (8); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "qpid.management", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(contentSize); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + mExchange->route (deliverable, "mgmt", 0); + + // Delete flagged objects + for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + { + managementObjects.erase (*iter); + } + deleteList.clear (); +} + +void ManagementAgent::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + size_t pos, start; + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + uint32_t contentSize; + + if (routingKey.compare (0, 7, "method.") != 0) + { + QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); + return; + } + + start = 7; + if (routingKey.length () == start) + { + QPID_LOG (debug, "Missing package-name in routing key: " << routingKey); + return; + } + + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing class-name in routing key: " << routingKey); + return; + } + + string packageName = routingKey.substr (start, pos - start); + + start = pos + 1; + pos = routingKey.find ('.', start); + if (pos == string::npos || routingKey.length () == pos + 1) + { + QPID_LOG (debug, "Missing method-name in routing key: " << routingKey); + return; + } + + string className = routingKey.substr (start, pos - start); + + start = pos + 1; + string methodName = routingKey.substr (start, routingKey.length () - start); + + QPID_LOG (debug, "Dispatch package: " << packageName << ", class: " + << className << ", method: " << methodName); + + contentSize = msg.encodedContentSize (); + if (contentSize < 8 || contentSize > 65536) + return; + + char *inMem = new char[contentSize]; + char outMem[4096]; // TODO Fix This + Buffer inBuffer (inMem, contentSize); + Buffer outBuffer (outMem, 4096); + + msg.encodeContent (inBuffer); + inBuffer.reset (); + + uint32_t methodId = inBuffer.getLong (); + uint64_t objId = inBuffer.getLongLong (); + string replyTo; + + inBuffer.getShortString (replyTo); + + QPID_LOG (debug, " len = " << contentSize << ", methodId = " << + methodId << ", objId = " << objId); + + outBuffer.putLong (methodId); + + ManagementObjectMap::iterator iter = managementObjects.find (objId); + if (iter == managementObjects.end ()) + { + outBuffer.putLong (2); + outBuffer.putShortString ("Invalid Object Id"); + } + else + { + iter->second->doMethod (methodName, inBuffer, outBuffer); + } + + Message::shared_ptr outMsg (new Message ()); + uint32_t msgSize = 4096 - outBuffer.available (); + outBuffer.reset (); + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "amq.direct", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody<AMQContentBody>()->decode(outBuffer, msgSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + outMsg->getFrames().append(method); + outMsg->getFrames().append(header); + + MessageProperties* props = outMsg->getFrames().getHeaders()->get<MessageProperties>(true); + props->setContentLength(msgSize); + outMsg->getFrames().append(content); + + DeliverableMessage outDeliverable (outMsg); + dExchange->route (outDeliverable, replyTo, 0); + + free (inMem); +} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..e84c0478f3 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,81 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "ManagementObject.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace management { + +class ManagementAgent +{ + private: + + ManagementAgent (uint16_t interval); + + public: + + typedef boost::shared_ptr<ManagementAgent> shared_ptr; + + static shared_ptr getAgent (void); + + void setInterval (uint16_t _interval) { interval = _interval; } + void setExchange (broker::Exchange::shared_ptr mgmtExchange, + broker::Exchange::shared_ptr directExchange); + void addObject (ManagementObject::shared_ptr object); + void clientAdded (void); + void dispatchCommand (broker::Deliverable& msg, + const std::string& routingKey, + const qpid::framing::FieldTable* args); + + private: + + struct Periodic : public broker::TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + ~Periodic () {} + void fire (); + }; + + static shared_ptr agent; + ManagementObjectMap managementObjects; + broker::Timer timer; + broker::Exchange::shared_ptr mExchange; + broker::Exchange::shared_ptr dExchange; + uint16_t interval; + uint64_t nextObjectId; + + void PeriodicProcessing (void); +}; + +}} + + + +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp new file mode 100644 index 0000000000..f18b6fc402 --- /dev/null +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementExchange::ManagementExchange (const string& _name) : + Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const std::string& _name, + bool _durable, + const FieldTable& _args) : + Exchange (_name, _durable, _args), + TopicExchange(_name, _durable, _args) {} + + +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const FieldTable* args) +{ + bool result = TopicExchange::bind (queue, routingKey, args); + + // Notify the management agent that a new management client has bound to the + // exchange. + if (result) + managementAgent->clientAdded (); + + return result; +} + +void ManagementExchange::route (Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + // Intercept management commands + if (routingKey.length () > 7 && + routingKey.substr (0, 7).compare ("method.") == 0) + { + managementAgent->dispatchCommand (msg, routingKey, args); + return; + } + + TopicExchange::route (msg, routingKey, args); +} + +void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + + +ManagementExchange::~ManagementExchange() {} + +const std::string ManagementExchange::typeName("management"); + diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h new file mode 100644 index 0000000000..6ccdf47182 --- /dev/null +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementExchange_ +#define _ManagementExchange_ + +#include "qpid/broker/TopicExchange.h" +#include "ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementExchange : public virtual TopicExchange +{ + private: + management::ManagementAgent::shared_ptr managementAgent; + + public: + static const std::string typeName; + + ManagementExchange (const string& name); + ManagementExchange (const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); + + virtual std::string getType() const { return typeName; } + + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + virtual void route (Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent (management::ManagementAgent::shared_ptr agent); + + virtual ~ManagementExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp new file mode 100644 index 0000000000..24588b4edd --- /dev/null +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "ManagementObject.h" + +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::sys; + +void ManagementObject::schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig, + bool isIndex) +{ + uint8_t flags = + (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); + + buf.putOctet (flags); + buf.putOctet (typeCode); + buf.putShortString (name); + buf.putShortString (description); +} + +void ManagementObject::schemaListBegin (Buffer& buf) +{ + schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true); +} + +void ManagementObject::schemaListEnd (Buffer& buf) +{ + buf.putOctet (FLAG_END); +} + +void ManagementObject::writeTimestamps (Buffer& buf) +{ + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); + buf.putLongLong (objectId); +} diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h new file mode 100644 index 0000000000..416a477796 --- /dev/null +++ b/cpp/src/qpid/management/ManagementObject.h @@ -0,0 +1,125 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "qpid/sys/Time.h" +#include <qpid/framing/Buffer.h> +#include <boost/shared_ptr.hpp> +#include <map> + +namespace qpid { +namespace management { + +const uint16_t OBJECT_SYSTEM = 1; +const uint16_t OBJECT_BROKER = 2; +const uint16_t OBJECT_VHOST = 3; +const uint16_t OBJECT_QUEUE = 4; +const uint16_t OBJECT_EXCHANGE = 5; +const uint16_t OBJECT_BINDING = 6; +const uint16_t OBJECT_CLIENT = 7; +const uint16_t OBJECT_SESSION = 8; +const uint16_t OBJECT_DESTINATION = 9; +const uint16_t OBJECT_PRODUCER = 10; +const uint16_t OBJECT_CONSUMER = 11; + +class Manageable; + +class ManagementObject +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + uint64_t objectId; + bool configChanged; + bool instChanged; + bool deleted; + Manageable* coreObject; + + static const uint8_t TYPE_UINT8 = 1; + static const uint8_t TYPE_UINT16 = 2; + static const uint8_t TYPE_UINT32 = 3; + static const uint8_t TYPE_UINT64 = 4; + static const uint8_t TYPE_BOOL = 5; + static const uint8_t TYPE_STRING = 6; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; + + void schemaItem (qpid::framing::Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false, + bool isIndex = false); + void schemaListBegin (qpid::framing::Buffer& buf); + void schemaListEnd (qpid::framing::Buffer& buf); + void writeTimestamps (qpid::framing::Buffer& buf); + + public: + typedef boost::shared_ptr<ManagementObject> shared_ptr; + + ManagementObject (Manageable* _core) : + destroyTime(0), objectId (0), configChanged(true), + instChanged(true), deleted(false), coreObject(_core) + { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } + virtual ~ManagementObject () {} + + virtual uint16_t getObjectType (void) = 0; + virtual std::string getObjectName (void) = 0; + virtual void writeSchema (qpid::framing::Buffer& buf) = 0; + virtual void writeConfig (qpid::framing::Buffer& buf) = 0; + virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + virtual void setSchemaNeeded (void) = 0; + virtual void doMethod (std::string methodName, + qpid::framing::Buffer& inBuf, + qpid::framing::Buffer& outBuf) = 0; + + void setObjectId (uint64_t oid) { objectId = oid; } + uint64_t getObjectId (void) { return objectId; } + inline bool getConfigChanged (void) { return configChanged; } + virtual bool getInstChanged (void) { return instChanged; } + inline void setAllChanged (void) + { + configChanged = true; + instChanged = true; + } + + inline void resourceDestroy (void) { + destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); + deleted = true; + } + bool isDeleted (void) { return deleted; } + +}; + + typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap; + +}} + + + +#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/management/Queue.cpp b/cpp/src/qpid/management/Queue.cpp new file mode 100644 index 0000000000..3c82877ebd --- /dev/null +++ b/cpp/src/qpid/management/Queue.cpp @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/log/Statement.h" +#include "Manageable.h" +#include "Queue.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Queue::schemaNeeded = true; + +Queue::Queue (Manageable* _core, Manageable* _parent, + const std::string& _name, + bool _durable, bool _autoDelete) : + ManagementObject(_core), name(_name), + durable(_durable), autoDelete(_autoDelete) +{ + vhostRef = _parent->GetManagementObject ()->getObjectId (); + + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +Queue::~Queue () {} + +void Queue::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT64, "vhostRef", "Virtual Host Ref", true); + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + schemaListEnd (buf); +} + +void Queue::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLongLong (vhostRef); + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); +} + +void Queue::writeInstrumentation (Buffer& buf) +{ + instChanged = false; + + writeTimestamps (buf); + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; +} diff --git a/cpp/src/qpid/management/Queue.h b/cpp/src/qpid/management/Queue.h new file mode 100644 index 0000000000..3a7fdf5263 --- /dev/null +++ b/cpp/src/qpid/management/Queue.h @@ -0,0 +1,184 @@ +#ifndef _ManagementQueue_ +#define _ManagementQueue_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +namespace qpid { +namespace management { + +const uint32_t MSG_MASK_TX = 1; // Transactional message +const uint32_t MSG_MASK_PERSIST = 2; // Persistent message + +class Queue : public ManagementObject +{ + private: + + static bool schemaNeeded; + + uint64_t vhostRef; + std::string name; + bool durable; + bool autoDelete; + + uint64_t msgTotalEnqueues; // Total messages enqueued + uint64_t msgTotalDequeues; // Total messages dequeued + uint64_t msgTxEnqueues; // Transactional messages enqueued + uint64_t msgTxDequeues; // Transactional messages dequeued + uint64_t msgPersistEnqueues; // Persistent messages enqueued + uint64_t msgPersistDequeues; // Persistent messages dequeued + + uint32_t msgDepth; // Current size of queue in messages + uint32_t msgDepthLow; // Low-water queue size, this interval + uint32_t msgDepthHigh; // High-water queue size, this interval + + uint64_t byteTotalEnqueues; // Total messages enqueued + uint64_t byteTotalDequeues; // Total messages dequeued + uint64_t byteTxEnqueues; // Transactional messages enqueued + uint64_t byteTxDequeues; // Transactional messages dequeued + uint64_t bytePersistEnqueues; // Persistent messages enqueued + uint64_t bytePersistDequeues; // Persistent messages dequeued + + uint32_t byteDepth; // Current size of queue in bytes + uint32_t byteDepthLow; // Low-water mark this interval + uint32_t byteDepthHigh; // High-water mark this interval + + uint64_t enqueueTxStarts; // Total enqueue transactions started + uint64_t enqueueTxCommits; // Total enqueue transactions committed + uint64_t enqueueTxRejects; // Total enqueue transactions rejected + + uint32_t enqueueTxCount; // Current pending enqueue transactions + uint32_t enqueueTxCountLow; // Low water mark this interval + uint32_t enqueueTxCountHigh; // High water mark this interval + + uint64_t dequeueTxStarts; // Total dequeue transactions started + uint64_t dequeueTxCommits; // Total dequeue transactions committed + uint64_t dequeueTxRejects; // Total dequeue transactions rejected + + uint32_t dequeueTxCount; // Current pending dequeue transactions + uint32_t dequeueTxCountLow; // Low water mark this interval + uint32_t dequeueTxCountHigh; // High water mark this interval + + uint32_t consumers; // Current consumers on queue + uint32_t consumersLow; // Low water mark this interval + uint32_t consumersHigh; // High water mark this interval + + uint16_t getObjectType (void) { return OBJECT_QUEUE; } + std::string getObjectName (void) { return "queue"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} + + inline void adjustQueueHiLo (void){ + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; + } + + inline void adjustTxHiLo (void){ + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; + } + + inline void adjustConsumerHiLo (void){ + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; + } + + public: + + typedef boost::shared_ptr<Queue> shared_ptr; + + Queue (Manageable* coreObject, Manageable* parentObject, + const std::string& name, bool durable, bool autoDelete); + ~Queue (void); + + // The following mask contents are used to describe enqueued or dequeued + // messages when counting statistics. + + inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); + } + + inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); + } + + inline void incConsumers (void){ + consumers++; + adjustConsumerHiLo (); + } + + inline void decConsumers (void){ + consumers--; + adjustConsumerHiLo (); + } +}; + +}} + + + +#endif /*!_ManagementQueue_*/ diff --git a/cpp/src/qpid/management/Vhost.cpp b/cpp/src/qpid/management/Vhost.cpp new file mode 100644 index 0000000000..effcb1599c --- /dev/null +++ b/cpp/src/qpid/management/Vhost.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "Vhost.h" + +using namespace qpid::management; +using namespace qpid::sys; +using namespace qpid::framing; + +bool Vhost::schemaNeeded = true; + +Vhost::Vhost (Manageable* _core, Manageable* _parent) : + ManagementObject (_core), name("/") +{ + brokerRef = _parent->GetManagementObject ()->getObjectId (); +} + +Vhost::~Vhost () {} + +void Vhost::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaListBegin (buf); + schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" , true); + schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true); + schemaListEnd (buf); +} + +void Vhost::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putLongLong (brokerRef); + buf.putShortString (name); +} + diff --git a/cpp/src/qpid/management/Vhost.h b/cpp/src/qpid/management/Vhost.h new file mode 100644 index 0000000000..5fc5a2870b --- /dev/null +++ b/cpp/src/qpid/management/Vhost.h @@ -0,0 +1,65 @@ +#ifndef _ManagementVhost_ +#define _ManagementVhost_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Manageable.h" +#include "ManagementObject.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace management { + +class Vhost : public ManagementObject +{ + public: + + typedef boost::shared_ptr<Vhost> shared_ptr; + + Vhost (Manageable* coreObject, Manageable* parentObject); + ~Vhost (void); + + private: + + static bool schemaNeeded; + + uint64_t brokerRef; + std::string name; + + uint16_t getObjectType (void) { return OBJECT_VHOST; } + std::string getObjectName (void) { return "vhost"; } + void writeSchema (qpid::framing::Buffer& buf); + void writeConfig (qpid::framing::Buffer& buf); + void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + void doMethod (std::string /*methodName*/, + qpid::framing::Buffer& /*inBuf*/, + qpid::framing::Buffer& /*outBuf*/) {} + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementVhost_*/ |
