diff options
Diffstat (limited to 'cpp/src/qpid/broker')
23 files changed, 204 insertions, 1364 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 68590d3331..d61100d255 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,7 +28,8 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" -#include "management/ManagementExchange.h" +#include "qpid/management/ManagementExchange.h" +#include "qpid/management/ArgsBrokerEcho.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -50,6 +51,11 @@ using qpid::sys::Acceptor; using qpid::framing::HandlerUpdater; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::management::ArgsBrokerEcho; namespace qpid { namespace broker { @@ -89,7 +95,7 @@ Broker::Options::Options(const std::string& name) : ("store-async", optValue(storeAsync,"yes|no"), "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.") ("store-force", optValue(storeForce,"yes|no"), - "Force changing modes of store, will delete all existing data if mode is change. Be SHURE you want to do this") + "Force changing modes of store, will delete all existing data if mode is changed. Be SURE you want to do this") ("mgmt,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), @@ -115,33 +121,23 @@ Broker::Broker(const Broker::Options& conf) : sessionManager(conf.ack) { if(conf.enableMgmt){ - managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); + managementAgent = ManagementAgent::getAgent (); + managementAgent->setInterval (conf.mgmtPubInterval); - mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtObject)); + mgmtObject = management::Broker::shared_ptr (new management::Broker (this, conf)); + managementAgent->addObject (mgmtObject); - // Since there is currently no support for virtual hosts, a management object - // representing the implied single virtual host is added here. - mgmtVhostObject = ManagementObjectVhost::shared_ptr - (new ManagementObjectVhost (mgmtObject->getObjectId (), conf)); - managementAgent->addObject (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + // Since there is currently no support for virtual hosts, a placeholder object + // representing the implied single virtual host is added here to keep the + // management schema correct. + Vhost* vhost = new Vhost (this); + vhostObject = Vhost::shared_ptr (vhost); - queues.setManagementAgent (managementAgent); - queues.setManagementVhost (dynamic_pointer_cast<ManagementObject>(mgmtVhostObject)); + queues.setParent (vhost); } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - if(conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); - exchanges.declare(qpid_management, ManagementExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get (qpid_management); - managementAgent->setExchange (mExchange); - dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); - } - else - QPID_LOG(info, "Management not enabled"); - if(store.get()) { if (!store->init(conf.storeDir, conf.storeAsync, conf.storeForce)){ throw Exception( "Existing Journal in different mode, backup/move existing data \ @@ -158,6 +154,17 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_fanout, FanOutExchange::typeName); declareStandardExchange(amq_match, HeadersExchange::typeName); + if(conf.enableMgmt) { + QPID_LOG(info, "Management enabled"); + exchanges.declare(qpid_management, ManagementExchange::typeName); + Exchange::shared_ptr mExchange = exchanges.get (qpid_management); + Exchange::shared_ptr dExchange = exchanges.get (amq_direct); + managementAgent->setExchange (mExchange, dExchange); + dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent (managementAgent); + } + else + QPID_LOG(info, "Management not enabled"); + // Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); for (Plugin::Plugins::const_iterator i = plugins.begin(); @@ -236,5 +243,17 @@ void Broker::update(ChannelId channel, FrameHandler::Chains& chains) { channel, boost::ref(chains))); } +ManagementObject::shared_ptr Broker::GetManagementObject(void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Broker::ManagementMethod (uint32_t /*methodId*/, + Args& /*_args*/) +{ + QPID_LOG (debug, "Broker::ManagementMethod"); + return Manageable::STATUS_OK; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 6f4cc97936..0980c970d2 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,9 +30,10 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" -#include "management/ManagementAgent.h" -#include "management/ManagementObjectBroker.h" -#include "management/ManagementObjectVhost.h" +#include "Vhost.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/management/Broker.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -55,7 +56,7 @@ namespace broker { /** * A broker instance. */ -class Broker : public sys::Runnable, public Plugin::Target +class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable { public: struct Options : public qpid::Options { @@ -114,7 +115,10 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager& getDtxManager() { return dtxManager; } SessionManager& getSessionManager() { return sessionManager; } - ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; } + + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); private: sys::Acceptor& getAcceptor() const; @@ -131,9 +135,9 @@ class Broker : public sys::Runnable, public Plugin::Target DtxManager dtxManager; HandlerUpdaters handlerUpdaters; SessionManager sessionManager; - ManagementAgent::shared_ptr managementAgent; - ManagementObjectBroker::shared_ptr mgmtObject; - ManagementObjectVhost::shared_ptr mgmtVhostObject; + management::ManagementAgent::shared_ptr managementAgent; + management::Broker::shared_ptr mgmtObject; + Vhost::shared_ptr vhostObject; static MessageStore* createStore(const Options& config); void declareStandardExchange(const std::string& name, const std::string& type); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 35660cfa0b..2f58e23c23 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -23,7 +23,7 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" -#include "management/ManagementExchange.h" +#include "qpid/management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 0065ed397c..fe412fd77b 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -89,6 +89,13 @@ void Message::encode(framing::Buffer& buffer) const frames.map_if(f2, TypeFilter(CONTENT_BODY)); } +void Message::encodeContent(framing::Buffer& buffer) const +{ + //encode the payload of each content frame + EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter(CONTENT_BODY)); +} + uint32_t Message::encodedSize() const { return encodedHeaderSize() + encodedContentSize(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 913cc759bc..7fe8628bf5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -91,6 +91,7 @@ public: uint32_t getRequiredCredit() const; void encode(framing::Buffer& buffer) const; + void encodeContent(framing::Buffer& buffer) const; /** * @returns the size of the buffer needed to encode this diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 5745c85331..919343b152 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -36,10 +36,15 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, - const ConnectionToken* const _owner) : + const ConnectionToken* const _owner, + Manageable* parent) : name(_name), autodelete(_autodelete), @@ -50,9 +55,21 @@ Queue::Queue(const string& _name, bool _autodelete, serializer(false), dispatchCallback(*this) { + if (parent != 0) + { + mgmtObject = management::Queue::shared_ptr + (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject); + } } -Queue::~Queue(){} +Queue::~Queue() +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Queue::notifyDurableIOComplete() { @@ -79,7 +96,7 @@ void Queue::deliver(Message::shared_ptr& msg){ mgmtObject->enqueue (msg->contentSize ()); }else { if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -92,7 +109,7 @@ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -102,10 +119,10 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ - uint32_t mask = MSG_MASK_TX; + uint32_t mask = management::MSG_MASK_TX; if (msg->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; push(msg); if (mgmtObject != 0) @@ -327,7 +344,7 @@ QueuedMessage Queue::dequeue(){ uint32_t mask = 0; if (msg.payload->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; mgmtObject->dequeue (msg.payload->contentSize (), mask); } @@ -571,3 +588,14 @@ void Queue::DispatchFunctor::operator()() if (sync) sync->completed(); } + +ManagementObject::shared_ptr Queue::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/, + Args& /*args*/) +{ + return Manageable::STATUS_OK; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index a1bbe275da..4439ecbcc1 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -35,7 +35,8 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" -#include "management/ManagementObjectQueue.h" +#include "qpid/management/Manageable.h" +#include "qpid/management/Queue.h" namespace qpid { namespace broker { @@ -59,7 +60,7 @@ namespace qpid { * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue : public PersistableQueue { + class Queue : public PersistableQueue, public management::Manageable { typedef std::vector<Consumer::ptr> Consumers; typedef std::deque<QueuedMessage> Messages; @@ -94,7 +95,7 @@ namespace qpid { qpid::sys::Serializer<DispatchFunctor> serializer; DispatchFunctor dispatchCallback; framing::SequenceNumber sequence; - ManagementObjectQueue::shared_ptr mgmtObject; + management::Queue::shared_ptr mgmtObject; void pop(); void push(Message::shared_ptr& msg); @@ -122,7 +123,8 @@ namespace qpid { Queue(const string& name, bool autodelete = false, MessageStore* const store = 0, - const ConnectionToken* const owner = 0); + const ConnectionToken* const owner = 0, + Manageable* parent = 0); ~Queue(); void create(const qpid::framing::FieldTable& settings); @@ -130,8 +132,6 @@ namespace qpid { void destroy(); void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); - void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObject = mgmt; } - ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObject; } bool acquire(const QueuedMessage& msg); @@ -203,6 +203,11 @@ namespace qpid { static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); static void tryAutoDelete(Broker& broker, Queue::shared_ptr); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args); }; } } diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index bc7f00c7ef..f246c653ea 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,8 +19,6 @@ * */ #include "QueueRegistry.h" -#include "management/ManagementAgent.h" -#include "management/ManagementObjectQueue.h" #include "qpid/log/Statement.h" #include <sstream> #include <assert.h> @@ -29,7 +27,7 @@ using namespace qpid::broker; using namespace qpid::sys; QueueRegistry::QueueRegistry(MessageStore* const _store) : - counter(1), store(_store) {} + counter(1), store(_store), parent(0) {} QueueRegistry::~QueueRegistry(){} @@ -43,17 +41,9 @@ QueueRegistry::declare(const string& declareName, bool durable, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); queues[name] = queue; - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject - (new ManagementObjectQueue (managementVhost->getObjectId (), name, durable, autoDelete)); - - queue->setMgmt (mgmtObject); - managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject)); - } - return std::pair<Queue::shared_ptr, bool>(queue, true); } else { return std::pair<Queue::shared_ptr, bool>(i->second, false); @@ -61,16 +51,6 @@ QueueRegistry::declare(const string& declareName, bool durable, } void QueueRegistry::destroyLH (const string& name){ - if (managementAgent){ - ManagementObjectQueue::shared_ptr mgmtObject; - QueueMap::iterator i = queues.find(name); - - if (i != queues.end()){ - mgmtObject = i->second->getMgmt (); - mgmtObject->resourceDestroy (); - } - } - queues.erase(name); } @@ -105,18 +85,3 @@ string QueueRegistry::generateName(){ MessageStore* const QueueRegistry::getStore() const { return store; } - -void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent) -{ - managementAgent = agent; -} - -ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void) -{ - return managementAgent; -} - -void QueueRegistry::setManagementVhost (ManagementObject::shared_ptr vhost) -{ - managementVhost = vhost; -} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 6fc90a5527..653a27b1d8 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -24,7 +24,7 @@ #include <map> #include "qpid/sys/Mutex.h" #include "Queue.h" -#include "management/ManagementAgent.h" +#include "qpid/management/Manageable.h" namespace qpid { namespace broker { @@ -89,22 +89,19 @@ class QueueRegistry{ * Return the message store used. */ MessageStore* const getStore() const; - + /** - * Set/Get the ManagementAgent in use. + * Register the manageable parent for declared queues */ - void setManagementAgent (ManagementAgent::shared_ptr agent); - ManagementAgent::shared_ptr getManagementAgent (void); - void setManagementVhost (ManagementObject::shared_ptr vhost); - + void setParent (management::Manageable* _parent) { parent = _parent; } + private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; qpid::sys::RWlock lock; int counter; MessageStore* const store; - ManagementAgent::shared_ptr managementAgent; - ManagementObject::shared_ptr managementVhost; + management::Manageable* parent; }; diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp new file mode 100644 index 0000000000..bf0521904f --- /dev/null +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -0,0 +1,37 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "Vhost.h" +#include "qpid/management/ManagementAgent.h" + +using namespace qpid::broker; +using qpid::management::ManagementAgent; + +Vhost::Vhost (management::Manageable* parentBroker) +{ + if (parentBroker != 0) + { + mgmtObject = management::Vhost::shared_ptr + (new management::Vhost (this, parentBroker)); + + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject); + } +} + diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h new file mode 100644 index 0000000000..b702dcebf0 --- /dev/null +++ b/cpp/src/qpid/broker/Vhost.h @@ -0,0 +1,51 @@ +#ifndef _Vhost_ +#define _Vhost_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/management/Manageable.h" +#include "qpid/management/Vhost.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class Vhost : public management::Manageable +{ + private: + + management::Vhost::shared_ptr mgmtObject; + + public: + + typedef boost::shared_ptr<Vhost> shared_ptr; + + Vhost (management::Manageable* parentBroker); + + management::ManagementObject::shared_ptr GetManagementObject (void) const + { return mgmtObject; } + + management::Manageable::status_t ManagementMethod (uint32_t, management::Args&) + { return management::Manageable::STATUS_OK; } +}; + +}} + +#endif /*!_Vhost_*/ diff --git a/cpp/src/qpid/broker/management/ManagementAgent.cpp b/cpp/src/qpid/broker/management/ManagementAgent.cpp index c52e330ff2..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementAgent.cpp +++ b/cpp/src/qpid/broker/management/ManagementAgent.cpp @@ -1,234 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementAgent.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> -#include <qpid/broker/MessageDelivery.h> -#include <qpid/framing/AMQFrame.h> -#include <list> - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval), nextObjectId(1) -{ - timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); -} - -void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) -{ - exchange = _exchange; -} - -void ManagementAgent::addObject (ManagementObject::shared_ptr object) -{ - object->setObjectId (nextObjectId++); - managementObjects.push_back (object); - QPID_LOG(info, "Management Object Added"); -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); - agent.PeriodicProcessing (); -} - -void ManagementAgent::clientAdded (void) -{ - for (ManagementObjectVector::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = *iter; - object->setAllChanged (); - object->setSchemaNeeded (); - } -} - -void ManagementAgent::PeriodicProcessing (void) -{ -#define BUFSIZE 65536 -#define THRESHOLD 16384 - char msgChars[BUFSIZE]; - Buffer msgBuffer (msgChars, BUFSIZE); - uint32_t contentSize; - std::list<uint32_t> deleteList; - - if (managementObjects.empty ()) - return; - - Message::shared_ptr msg (new Message ()); - - // Build the magic number for the management message. - msgBuffer.putOctet ('A'); - msgBuffer.putOctet ('M'); - msgBuffer.putOctet ('0'); - msgBuffer.putOctet ('1'); - - for (uint32_t idx = 0; idx < managementObjects.size (); idx++) - { - ManagementObject::shared_ptr object = managementObjects[idx]; - - if (object->getSchemaNeeded ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeSchema (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getConfigChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeConfig (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getInstChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeInstrumentation (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->isDeleted ()) - deleteList.push_back (idx); - - // Temporary protection against buffer overrun. - // This needs to be replaced with frame fragmentation. - if (msgBuffer.available () < THRESHOLD) - break; - } - - msgBuffer.putOctet ('X'); // End-of-message - msgBuffer.putOctet (0); - msgBuffer.putShort (0); - msgBuffer.putLong (8); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - - AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "qpid.management", 0, 0)); - AMQFrame header (0, AMQHeaderBody()); - AMQFrame content; - - content.setBody(AMQContentBody()); - content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - exchange->route (deliverable, "mgmt", 0); - - // Delete flagged objects - for (std::list<uint32_t>::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - { - managementObjects.erase (managementObjects.begin () + *iter); - } - deleteList.clear (); -} - -void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, - const string& routingKey, - const FieldTable* /*args*/) -{ - size_t pos, start; - - if (routingKey.compare (0, 7, "method.") != 0) - { - QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey); - return; - } - - start = 7; - if (routingKey.length () == start) - { - QPID_LOG (debug, "Missing class-key in routing key: " << routingKey); - return; - } - - pos = routingKey.find ('.', start); - if (pos == string::npos || routingKey.length () == pos + 1) - { - QPID_LOG (debug, "Missing method-key in routing key: " << routingKey); - return; - } - - string className = routingKey.substr (start, pos - start); - - start = pos + 1; - - string methodName = routingKey.substr (start, routingKey.length () - start); - - QPID_LOG (debug, "Dispatch class: " << className << ", method: " << methodName); -} - diff --git a/cpp/src/qpid/broker/management/ManagementAgent.h b/cpp/src/qpid/broker/management/ManagementAgent.h index e699228601..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementAgent.h +++ b/cpp/src/qpid/broker/management/ManagementAgent.h @@ -1,74 +0,0 @@ -#ifndef _ManagementAgent_ -#define _ManagementAgent_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "ManagementObject.h" -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace broker { - - -class ManagementAgent -{ - public: - - typedef boost::shared_ptr<ManagementAgent> shared_ptr; - - ManagementAgent (uint16_t interval); - - void setExchange (Exchange::shared_ptr exchange); - void addObject (ManagementObject::shared_ptr object); - void clientAdded (void); - void dispatchCommand (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - private: - - struct Periodic : public TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} - void fire (); - }; - - ManagementObjectVector managementObjects; - Timer timer; - Exchange::shared_ptr exchange; - uint16_t interval; - uint32_t nextObjectId; - - void PeriodicProcessing (void); -}; - -}} - - - -#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/broker/management/ManagementExchange.cpp b/cpp/src/qpid/broker/management/ManagementExchange.cpp index 5d829477ba..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementExchange.cpp +++ b/cpp/src/qpid/broker/management/ManagementExchange.cpp @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementExchange::ManagementExchange (const string& _name) : - Exchange (_name), TopicExchange(_name) {} -ManagementExchange::ManagementExchange (const std::string& _name, - bool _durable, - const FieldTable& _args) : - Exchange (_name, _durable, _args), - TopicExchange(_name, _durable, _args) {} - - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const FieldTable* args) -{ - bool result = TopicExchange::bind (queue, routingKey, args); - - // Notify the management agent that a new management client has bound to the - // exchange. - if (result) - managementAgent->clientAdded (); - - return result; -} - -void ManagementExchange::route (Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - // Intercept management commands - if (routingKey.length () > 7 && - routingKey.substr (0, 7).compare ("method.") == 0) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } - - TopicExchange::route (msg, routingKey, args); -} - -void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) -{ - managementAgent = agent; -} - - -ManagementExchange::~ManagementExchange() {} - -const std::string ManagementExchange::typeName("management"); - diff --git a/cpp/src/qpid/broker/management/ManagementExchange.h b/cpp/src/qpid/broker/management/ManagementExchange.h index c38f38d0a1..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementExchange.h +++ b/cpp/src/qpid/broker/management/ManagementExchange.h @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ManagementExchange_ -#define _ManagementExchange_ - -#include "qpid/broker/TopicExchange.h" -#include "ManagementAgent.h" - -namespace qpid { -namespace broker { - -class ManagementExchange : public virtual TopicExchange -{ - private: - ManagementAgent::shared_ptr managementAgent; - - public: - static const std::string typeName; - - ManagementExchange (const string& name); - ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); - - virtual std::string getType() const { return typeName; } - - virtual bool bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args); - - virtual void route (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - void setManagmentAgent (ManagementAgent::shared_ptr agent); - - virtual ~ManagementExchange(); -}; - - -} -} - -#endif diff --git a/cpp/src/qpid/broker/management/ManagementObject.cpp b/cpp/src/qpid/broker/management/ManagementObject.cpp index 1c693d6b92..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObject.cpp +++ b/cpp/src/qpid/broker/management/ManagementObject.cpp @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -void ManagementObject::schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig, - bool isIndex) -{ - uint8_t flags = - (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); - - buf.putOctet (flags); - buf.putOctet (typeCode); - buf.putShortString (name); - buf.putShortString (description); -} - -void ManagementObject::schemaListBegin (Buffer& buf) -{ - schemaItem (buf, TYPE_UINT32, "id", "Object ID", true, true); -} - -void ManagementObject::schemaListEnd (Buffer& buf) -{ - buf.putOctet (FLAG_END); -} - -void ManagementObject::writeTimestamps (Buffer& buf) -{ - buf.putLongLong (uint64_t (Duration (now ()))); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - buf.putLong (objectId); -} diff --git a/cpp/src/qpid/broker/management/ManagementObject.h b/cpp/src/qpid/broker/management/ManagementObject.h index a39ae80c4f..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObject.h +++ b/cpp/src/qpid/broker/management/ManagementObject.h @@ -1,118 +0,0 @@ -#ifndef _ManagementObject_ -#define _ManagementObject_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include <qpid/framing/Buffer.h> -#include <boost/shared_ptr.hpp> -#include <vector> - -namespace qpid { -namespace broker { - -const uint16_t OBJECT_SYSTEM = 1; -const uint16_t OBJECT_BROKER = 2; -const uint16_t OBJECT_VHOST = 3; -const uint16_t OBJECT_QUEUE = 4; -const uint16_t OBJECT_EXCHANGE = 5; -const uint16_t OBJECT_BINDING = 6; -const uint16_t OBJECT_CLIENT = 7; -const uint16_t OBJECT_SESSION = 8; -const uint16_t OBJECT_DESTINATION = 9; -const uint16_t OBJECT_PRODUCER = 10; -const uint16_t OBJECT_CONSUMER = 11; - - -class ManagementObject -{ - protected: - - uint64_t createTime; - uint64_t destroyTime; - uint32_t objectId; - bool configChanged; - bool instChanged; - bool deleted; - - static const uint8_t TYPE_UINT8 = 1; - static const uint8_t TYPE_UINT16 = 2; - static const uint8_t TYPE_UINT32 = 3; - static const uint8_t TYPE_UINT64 = 4; - static const uint8_t TYPE_BOOL = 5; - static const uint8_t TYPE_STRING = 6; - - static const uint8_t FLAG_CONFIG = 0x01; - static const uint8_t FLAG_INDEX = 0x02; - static const uint8_t FLAG_END = 0x80; - - void schemaItem (qpid::framing::Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig = false, - bool isIndex = false); - void schemaListBegin (qpid::framing::Buffer& buf); - void schemaListEnd (qpid::framing::Buffer& buf); - void writeTimestamps (qpid::framing::Buffer& buf); - - public: - typedef boost::shared_ptr<ManagementObject> shared_ptr; - - ManagementObject () : destroyTime(0), objectId (), configChanged(true), - instChanged(true), deleted(false) - { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); } - virtual ~ManagementObject () {} - - virtual uint16_t getObjectType (void) = 0; - virtual std::string getObjectName (void) = 0; - virtual void writeSchema (qpid::framing::Buffer& buf) = 0; - virtual void writeConfig (qpid::framing::Buffer& buf) = 0; - virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; - - void setObjectId (uint32_t oid) { objectId = oid; } - uint32_t getObjectId (void) { return objectId; } - inline bool getConfigChanged (void) { return configChanged; } - virtual bool getInstChanged (void) { return instChanged; } - inline void setAllChanged (void) - { - configChanged = true; - instChanged = true; - } - - inline void resourceDestroy (void) { - destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); - deleted = true; - } - bool isDeleted (void) { return deleted; } - -}; - - typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; - -}} - - - -#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp index 139ea0acca..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "config.h" -#include "qpid/broker/Broker.h" -#include "ManagementObjectBroker.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectBroker::schemaNeeded = true; - -ManagementObjectBroker::ManagementObjectBroker (const Options& _conf) -{ - Broker::Options& conf = (Broker::Options&) _conf; - - sysId = "sysId"; - port = conf.port; - workerThreads = conf.workerThreads; - maxConns = conf.maxConnections; - connBacklog = conf.connectionBacklog; - stagingThreshold = conf.stagingThreshold; - storeLib = conf.store; - asyncStore = conf.storeAsync; - mgmtPubInterval = conf.mgmtPubInterval; - initialDiskPageSize = 0; - initialPagesPerQueue = 0; - clusterName = ""; - version = PACKAGE_VERSION; -} - -ManagementObjectBroker::~ManagementObjectBroker () {} - -void ManagementObjectBroker::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true); - schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true); - schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); - schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); - schemaItem (buf, TYPE_UINT16, "connBacklog", - "Connection backlog limit for listening socket", true); - schemaItem (buf, TYPE_UINT32, "stagingThreshold", - "Broker stages messages over this size to disk", true); - schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); - schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); - schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); - schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", - "Number of disk pages allocated for storage", true); - schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", - "Number of disk pages allocated per queue", true); - schemaItem (buf, TYPE_STRING, "clusterName", - "Name of cluster this server is a member of, zero-length for standalone server", true); - schemaItem (buf, TYPE_STRING, "version", "Running software version", true); - schemaListEnd (buf); -} - -void ManagementObjectBroker::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (0); - buf.putShort (port); - buf.putShort (workerThreads); - buf.putShort (maxConns); - buf.putShort (connBacklog); - buf.putLong (stagingThreshold); - buf.putShortString (storeLib); - buf.putOctet (asyncStore ? 1 : 0); - buf.putShort (mgmtPubInterval); - buf.putLong (initialDiskPageSize); - buf.putLong (initialPagesPerQueue); - buf.putShortString (clusterName); - buf.putShortString (version); -} - diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.h b/cpp/src/qpid/broker/management/ManagementObjectBroker.h index 25d6bf49b0..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectBroker.h +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.h @@ -1,73 +0,0 @@ -#ifndef _ManagementObjectBroker_ -#define _ManagementObjectBroker_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" -#include "qpid/Options.h" -#include "boost/shared_ptr.hpp" - -namespace qpid { -namespace broker { - -class ManagementObjectBroker : public ManagementObject -{ - public: - - typedef boost::shared_ptr<ManagementObjectBroker> shared_ptr; - - ManagementObjectBroker (const Options& conf); - ~ManagementObjectBroker (void); - - private: - - static bool schemaNeeded; - - std::string sysId; - uint16_t port; - uint16_t workerThreads; - uint16_t maxConns; - uint16_t connBacklog; - uint32_t stagingThreshold; - std::string storeLib; - bool asyncStore; - uint16_t mgmtPubInterval; - uint32_t initialDiskPageSize; - uint32_t initialPagesPerQueue; - std::string clusterName; - std::string version; - - uint16_t getObjectType (void) { return OBJECT_BROKER; } - std::string getObjectName (void) { return "broker"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline bool getInstChanged (void) { return false; } -}; - -}} - - -#endif /*!_ManagementObjectBroker_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp index 8d8ff8cbf4..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObjectQueue.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectQueue::schemaNeeded = true; - -ManagementObjectQueue::ManagementObjectQueue (uint32_t _vhostRef, std::string& _name, - bool _durable, bool _autoDelete) : - vhostRef(_vhostRef), name(_name), durable(_durable), autoDelete(_autoDelete) -{ - msgTotalEnqueues = 0; - msgTotalDequeues = 0; - msgTxEnqueues = 0; - msgTxDequeues = 0; - msgPersistEnqueues = 0; - msgPersistDequeues = 0; - - msgDepth = 0; - msgDepthLow = 0; - msgDepthHigh = 0; - - byteTotalEnqueues = 0; - byteTotalDequeues = 0; - byteTxEnqueues = 0; - byteTxDequeues = 0; - bytePersistEnqueues = 0; - bytePersistDequeues = 0; - - byteDepth = 0; - byteDepthLow = 0; - byteDepthHigh = 0; - - enqueueTxStarts = 0; - enqueueTxCommits = 0; - enqueueTxRejects = 0; - dequeueTxStarts = 0; - dequeueTxCommits = 0; - dequeueTxRejects = 0; - - enqueueTxCount = 0; - enqueueTxCountLow = 0; - enqueueTxCountHigh = 0; - - dequeueTxCount = 0; - dequeueTxCountLow = 0; - dequeueTxCountHigh = 0; - - consumers = 0; - consumersLow = 0; - consumersHigh = 0; -} - -ManagementObjectQueue::~ManagementObjectQueue () {} - -void ManagementObjectQueue::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "vhostRef", "Virtual Host Ref", true); - schemaItem (buf, TYPE_STRING, "name", "Queue Name", true); - schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); - schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); - schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); - schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); - schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); - schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); - schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); - schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started "); - schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed"); - schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started "); - schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed"); - schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); - schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); - schemaListEnd (buf); -} - -void ManagementObjectQueue::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (vhostRef); - buf.putShortString (name); - buf.putOctet (durable ? 1 : 0); - buf.putOctet (autoDelete ? 1 : 0); -} - -void ManagementObjectQueue::writeInstrumentation (Buffer& buf) -{ - instChanged = false; - - writeTimestamps (buf); - buf.putLongLong (msgTotalEnqueues); - buf.putLongLong (msgTotalDequeues); - buf.putLongLong (msgTxEnqueues); - buf.putLongLong (msgTxDequeues); - buf.putLongLong (msgPersistEnqueues); - buf.putLongLong (msgPersistDequeues); - buf.putLong (msgDepth); - buf.putLong (msgDepthLow); - buf.putLong (msgDepthHigh); - buf.putLongLong (byteTotalEnqueues); - buf.putLongLong (byteTotalDequeues); - buf.putLongLong (byteTxEnqueues); - buf.putLongLong (byteTxDequeues); - buf.putLongLong (bytePersistEnqueues); - buf.putLongLong (bytePersistDequeues); - buf.putLong (byteDepth); - buf.putLong (byteDepthLow); - buf.putLong (byteDepthHigh); - buf.putLongLong (enqueueTxStarts); - buf.putLongLong (enqueueTxCommits); - buf.putLongLong (enqueueTxRejects); - buf.putLong (enqueueTxCount); - buf.putLong (enqueueTxCountLow); - buf.putLong (enqueueTxCountHigh); - buf.putLongLong (dequeueTxStarts); - buf.putLongLong (dequeueTxCommits); - buf.putLongLong (dequeueTxRejects); - buf.putLong (dequeueTxCount); - buf.putLong (dequeueTxCountLow); - buf.putLong (dequeueTxCountHigh); - buf.putLong (consumers); - buf.putLong (consumersLow); - buf.putLong (consumersHigh); - - msgDepthLow = msgDepth; - msgDepthHigh = msgDepth; - byteDepthLow = byteDepth; - byteDepthHigh = byteDepth; - enqueueTxCountLow = enqueueTxCount; - enqueueTxCountHigh = enqueueTxCount; - dequeueTxCountLow = dequeueTxCount; - dequeueTxCountHigh = dequeueTxCount; - consumersLow = consumers; - consumersHigh = consumers; -} diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.h b/cpp/src/qpid/broker/management/ManagementObjectQueue.h index cedbf81809..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectQueue.h +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.h @@ -1,181 +0,0 @@ -#ifndef _ManagementObjectQueue_ -#define _ManagementObjectQueue_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -namespace qpid { -namespace broker { - -const uint32_t MSG_MASK_TX = 1; // Transactional message -const uint32_t MSG_MASK_PERSIST = 2; // Persistent message - -class ManagementObjectQueue : public ManagementObject -{ - private: - - static bool schemaNeeded; - - uint32_t vhostRef; - std::string name; - bool durable; - bool autoDelete; - - uint64_t msgTotalEnqueues; // Total messages enqueued - uint64_t msgTotalDequeues; // Total messages dequeued - uint64_t msgTxEnqueues; // Transactional messages enqueued - uint64_t msgTxDequeues; // Transactional messages dequeued - uint64_t msgPersistEnqueues; // Persistent messages enqueued - uint64_t msgPersistDequeues; // Persistent messages dequeued - - uint32_t msgDepth; // Current size of queue in messages - uint32_t msgDepthLow; // Low-water queue size, this interval - uint32_t msgDepthHigh; // High-water queue size, this interval - - uint64_t byteTotalEnqueues; // Total messages enqueued - uint64_t byteTotalDequeues; // Total messages dequeued - uint64_t byteTxEnqueues; // Transactional messages enqueued - uint64_t byteTxDequeues; // Transactional messages dequeued - uint64_t bytePersistEnqueues; // Persistent messages enqueued - uint64_t bytePersistDequeues; // Persistent messages dequeued - - uint32_t byteDepth; // Current size of queue in bytes - uint32_t byteDepthLow; // Low-water mark this interval - uint32_t byteDepthHigh; // High-water mark this interval - - uint64_t enqueueTxStarts; // Total enqueue transactions started - uint64_t enqueueTxCommits; // Total enqueue transactions committed - uint64_t enqueueTxRejects; // Total enqueue transactions rejected - - uint32_t enqueueTxCount; // Current pending enqueue transactions - uint32_t enqueueTxCountLow; // Low water mark this interval - uint32_t enqueueTxCountHigh; // High water mark this interval - - uint64_t dequeueTxStarts; // Total dequeue transactions started - uint64_t dequeueTxCommits; // Total dequeue transactions committed - uint64_t dequeueTxRejects; // Total dequeue transactions rejected - - uint32_t dequeueTxCount; // Current pending dequeue transactions - uint32_t dequeueTxCountLow; // Low water mark this interval - uint32_t dequeueTxCountHigh; // High water mark this interval - - uint32_t consumers; // Current consumers on queue - uint32_t consumersLow; // Low water mark this interval - uint32_t consumersHigh; // High water mark this interval - - uint16_t getObjectType (void) { return OBJECT_QUEUE; } - std::string getObjectName (void) { return "queue"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline void adjustQueueHiLo (void){ - if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; - if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; - - if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; - if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; - instChanged = true; - } - - inline void adjustTxHiLo (void){ - if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; - if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; - if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; - if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; - instChanged = true; - } - - inline void adjustConsumerHiLo (void){ - if (consumers > consumersHigh) consumersHigh = consumers; - if (consumers < consumersLow) consumersLow = consumers; - instChanged = true; - } - - public: - - typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr; - - ManagementObjectQueue (uint32_t _vhostRef, std::string& name, - bool durable, bool autoDelete); - ~ManagementObjectQueue (void); - - // The following mask contents are used to describe enqueued or dequeued - // messages when counting statistics. - - inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalEnqueues++; - byteTotalEnqueues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxEnqueues++; - byteTxEnqueues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistEnqueues++; - bytePersistEnqueues += bytes; - } - - msgDepth++; - byteDepth += bytes; - adjustQueueHiLo (); - } - - inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalDequeues++; - byteTotalDequeues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxDequeues++; - byteTxDequeues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistDequeues++; - bytePersistDequeues += bytes; - } - - msgDepth--; - byteDepth -= bytes; - adjustQueueHiLo (); - } - - inline void incConsumers (void){ - consumers++; - adjustConsumerHiLo (); - } - - inline void decConsumers (void){ - consumers--; - adjustConsumerHiLo (); - } -}; - -}} - - - -#endif /*!_ManagementObjectQueue_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp index b5c8b3b0b7..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Broker.h" -#include "ManagementObjectVhost.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectVhost::schemaNeeded = true; - -ManagementObjectVhost::ManagementObjectVhost (uint32_t _sysRef, const Options& /*_conf*/) : - sysRef(_sysRef), name("/") {} - -ManagementObjectVhost::~ManagementObjectVhost () {} - -void ManagementObjectVhost::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaListBegin (buf); - schemaItem (buf, TYPE_UINT32, "brokerRef", "Broker Reference" , true); - schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true); - schemaListEnd (buf); -} - -void ManagementObjectVhost::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putLong (sysRef); - buf.putShortString (name); -} - diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.h b/cpp/src/qpid/broker/management/ManagementObjectVhost.h index c36acb3487..e69de29bb2 100644 --- a/cpp/src/qpid/broker/management/ManagementObjectVhost.h +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.h @@ -1,62 +0,0 @@ -#ifndef _ManagementObjectVhost_ -#define _ManagementObjectVhost_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" -#include "qpid/Options.h" -#include "boost/shared_ptr.hpp" - -namespace qpid { -namespace broker { - -class ManagementObjectVhost : public ManagementObject -{ - public: - - typedef boost::shared_ptr<ManagementObjectVhost> shared_ptr; - - ManagementObjectVhost (uint32_t sysRef, const Options& conf); - ~ManagementObjectVhost (void); - - private: - - static bool schemaNeeded; - - uint32_t sysRef; - std::string name; - - uint16_t getObjectType (void) { return OBJECT_VHOST; } - std::string getObjectName (void) { return "vhost"; } - void writeSchema (qpid::framing::Buffer& buf); - void writeConfig (qpid::framing::Buffer& buf); - void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {} - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline bool getInstChanged (void) { return false; } -}; - -}} - - -#endif /*!_ManagementObjectVhost_*/ |
