From a1b440e5393206ec5833e2d6c2617c2aca71701f Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 11 May 2009 14:16:52 +0000 Subject: QPID-1843 - Cleaned up the interface to the broker's internal management agent. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@773570 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 6 +- cpp/src/qpid/acl/Acl.cpp | 2 +- cpp/src/qpid/acl/Acl.h | 2 +- cpp/src/qpid/broker/Bridge.cpp | 6 +- cpp/src/qpid/broker/Broker.cpp | 23 +- cpp/src/qpid/broker/Broker.h | 5 +- cpp/src/qpid/broker/Connection.cpp | 2 +- cpp/src/qpid/broker/Connection.h | 2 +- cpp/src/qpid/broker/DirectExchange.cpp | 6 +- cpp/src/qpid/broker/DirectExchange.h | 4 +- cpp/src/qpid/broker/Exchange.cpp | 58 +- cpp/src/qpid/broker/Exchange.h | 7 +- cpp/src/qpid/broker/ExchangeRegistry.cpp | 10 +- cpp/src/qpid/broker/ExchangeRegistry.h | 4 +- cpp/src/qpid/broker/FanOutExchange.cpp | 8 +- cpp/src/qpid/broker/FanOutExchange.h | 4 +- cpp/src/qpid/broker/HeadersExchange.cpp | 8 +- cpp/src/qpid/broker/HeadersExchange.h | 4 +- cpp/src/qpid/broker/Link.cpp | 6 +- cpp/src/qpid/broker/Link.h | 2 +- cpp/src/qpid/broker/Queue.cpp | 18 +- cpp/src/qpid/broker/Queue.h | 4 +- cpp/src/qpid/broker/QueueRegistry.cpp | 6 +- cpp/src/qpid/broker/QueueRegistry.h | 3 +- cpp/src/qpid/broker/SessionAdapter.cpp | 18 +- cpp/src/qpid/broker/SessionState.cpp | 8 +- cpp/src/qpid/broker/System.cpp | 7 +- cpp/src/qpid/broker/System.h | 4 +- cpp/src/qpid/broker/TopicExchange.cpp | 6 +- cpp/src/qpid/broker/TopicExchange.h | 4 +- cpp/src/qpid/broker/Vhost.cpp | 9 +- cpp/src/qpid/broker/Vhost.h | 3 +- cpp/src/qpid/cluster/Cluster.cpp | 4 +- cpp/src/qpid/cluster/ClusterPlugin.cpp | 5 +- cpp/src/qpid/management/ManagementAgent.cpp | 1131 +++++++++++++++++++++++ cpp/src/qpid/management/ManagementAgent.h | 248 +++++ cpp/src/qpid/management/ManagementBroker.cpp | 1161 ------------------------ cpp/src/qpid/management/ManagementBroker.h | 242 ----- cpp/src/qpid/management/ManagementExchange.cpp | 12 +- cpp/src/qpid/management/ManagementExchange.h | 10 +- cpp/src/qpid/management/ManagementObject.cpp | 4 +- cpp/src/qpid/management/ManagementObject.h | 10 +- 42 files changed, 1534 insertions(+), 1552 deletions(-) create mode 100644 cpp/src/qpid/management/ManagementAgent.cpp create mode 100644 cpp/src/qpid/management/ManagementAgent.h delete mode 100644 cpp/src/qpid/management/ManagementBroker.cpp delete mode 100644 cpp/src/qpid/management/ManagementBroker.h (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 4d2d375802..63ca7009d9 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -87,7 +87,7 @@ $(rgen_generator): # Management generator. mgen_dir=$(top_srcdir)/managementgen -mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -o gen/qmf \ +mgen_cmd=$(mgen_dir)/qmf-gen -m $(srcdir)/managementgen.mk -q -b -o gen/qmf \ $(top_srcdir)/../specs/management-schema.xml \ $(srcdir)/qpid/acl/management-schema.xml \ $(srcdir)/qpid/cluster/management-schema.xml @@ -427,7 +427,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/TxBuffer.cpp \ qpid/broker/TxPublish.cpp \ qpid/broker/Vhost.cpp \ - qpid/management/ManagementBroker.cpp \ + qpid/management/ManagementAgent.cpp \ qpid/management/ManagementExchange.cpp \ qpid/sys/TCPIOPlugin.cpp @@ -679,7 +679,7 @@ nobase_include_HEADERS = \ qpid/management/Args.h \ qpid/management/IdAllocator.h \ qpid/management/Manageable.h \ - qpid/management/ManagementBroker.h \ + qpid/management/ManagementAgent.h \ qpid/management/ManagementEvent.h \ qpid/management/ManagementExchange.h \ qpid/management/ManagementObject.h \ diff --git a/cpp/src/qpid/acl/Acl.cpp b/cpp/src/qpid/acl/Acl.cpp index 8c128e7bb9..fe2644c136 100644 --- a/cpp/src/qpid/acl/Acl.cpp +++ b/cpp/src/qpid/acl/Acl.cpp @@ -46,7 +46,7 @@ namespace _qmf = qmf::org::apache::qpid::acl; Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker->getManagementAgent(); if (agent != 0){ _qmf::Package packageInit(agent); diff --git a/cpp/src/qpid/acl/Acl.h b/cpp/src/qpid/acl/Acl.h index 7770843e87..e153187b3d 100644 --- a/cpp/src/qpid/acl/Acl.h +++ b/cpp/src/qpid/acl/Acl.h @@ -26,7 +26,7 @@ #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/acl/Acl.h" #include diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 4d275b958f..e629a20e87 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -24,7 +24,7 @@ #include "LinkRegistry.h" #include "SessionState.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" @@ -64,7 +64,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, std::stringstream title; title << id << "_" << link->getBroker()->getFederationTag(); queueName += title.str(); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Bridge (agent, this, link, id, args.i_durable, args.i_src, args.i_dest, @@ -181,7 +181,7 @@ void Bridge::destroy() void Bridge::setPersistenceId(uint64_t pId) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = link->getBroker()->getManagementAgent(); agent->addObject (mgmtObject, pId); } persistenceId = pId; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index c43eca6e5b..749489fbfd 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -65,7 +65,7 @@ using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; -using qpid::management::ManagementBroker; +using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -136,10 +136,11 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgentSingleton(!config.enableMgmt), store(0), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), + queues(this), + exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), dtxManager(timer), @@ -148,6 +149,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), + managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), queueCleaner(queues, timer), queueEvents(poller), recovery(true), @@ -156,13 +158,11 @@ Broker::Broker(const Broker::Options& conf) : { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); - managementAgent = managementAgentSingleton.getInstance(); - ((ManagementBroker*) managementAgent)->configure - (dataDir.isEnabled() ? dataDir.getPath() : string(), - conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), + conf.mgmtPubInterval, this, conf.workerThreads + 3); _qmf::Package packageInitializer(managementAgent); - System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string()); + System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); mgmtObject = new _qmf::Broker(managementAgent, this, system, conf.port); @@ -182,9 +182,9 @@ Broker::Broker(const Broker::Options& conf) : // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the // management schema correct. - Vhost* vhost = new Vhost(this); + Vhost* vhost = new Vhost(this, this); vhostObject = Vhost::shared_ptr(vhost); - framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid()); + framing::Uuid uuid(managementAgent->getUuid()); federationTag = uuid.str(); vhostObject->setFederationTag(federationTag); @@ -238,9 +238,8 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(qpid_management, ManagementExchange::typeName); Exchange::shared_ptr mExchange = exchanges.get (qpid_management); Exchange::shared_ptr dExchange = exchanges.get (amq_direct); - ((ManagementBroker*) managementAgent)->setExchange (mExchange, dExchange); - boost::dynamic_pointer_cast(mExchange)->setManagmentAgent - ((ManagementBroker*) managementAgent); + managementAgent->setExchange(mExchange, dExchange); + boost::dynamic_pointer_cast(mExchange)->setManagmentAgent(managementAgent); } else QPID_LOG(info, "Management not enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 5a1529a3ba..8f4621bb39 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -39,7 +39,7 @@ #include "Timer.h" #include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" #include "qpid/Options.h" @@ -120,7 +120,6 @@ public: boost::shared_ptr poller; Options config; - management::ManagementAgent::Singleton managementAgentSingleton; ProtocolFactoryMap protocolFactories; std::auto_ptr store; AclModule* acl; @@ -235,6 +234,8 @@ public: void setRecovery(bool set) { recovery = set; } bool getRecovery() const { return recovery; } + + management::ManagementAgent* getManagementAgent() { return managementAgent; } }; }} diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 365b3ccbeb..22188054a6 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -67,7 +67,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std if (parent != 0) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker_.getManagementAgent(); // TODO set last bool true if system connection diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index e67cdce681..770bf2184f 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -40,7 +40,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/ProtocolVersion.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/management/Manageable.h" #include "qpid/ptr_map.h" #include "qpid/sys/AggregateOutput.h" diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index d1d9ad07e4..deb9699c96 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -41,15 +41,15 @@ const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } -DirectExchange::DirectExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +DirectExchange::DirectExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } DirectExchange::DirectExchange(const string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type(typeName); diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 27d101c4fe..9081c319c0 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -46,11 +46,11 @@ public: static const std::string typeName; QPID_BROKER_EXTERN DirectExchange(const std::string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN DirectExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index dd1fe98b2c..acedd1f91a 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -21,8 +21,8 @@ #include "Exchange.h" #include "ExchangeRegistry.h" -#include "qpid/agent/ManagementAgent.h" -#include "qpid/management/ManagementBroker.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "DeliverableMessage.h" @@ -33,7 +33,6 @@ using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::sys::Mutex; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -83,13 +82,13 @@ void Exchange::routeIVE(){ } -Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0) +Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); @@ -101,13 +100,13 @@ Exchange::Exchange (const string& _name, Manageable* parent) : static const std::string QPID_MANAGEMENT("qpid.management"); Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - Manageable* parent) + Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); @@ -118,8 +117,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } else if (name == QPID_MANAGEMENT) { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID } else { - ManagementBroker* mb = dynamic_cast(agent); - agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtExchange, agent->allocateId(this)); } } } @@ -145,7 +143,7 @@ void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject (mgmtExchange, 0x2000000000000000LL + id); } persistenceId = id; @@ -240,20 +238,22 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang { if (parent != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); - if (agent != 0) - { - ManagementObject* mo = queue->GetManagementObject(); - if (mo != 0) - { - management::ObjectId queueId = mo->getObjectId(); - mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, args); - if (!origin.empty()) - mgmtBinding->set_origin(origin); - ManagementBroker* mb = dynamic_cast(agent); - agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0); - } + Broker* broker = parent->getBroker(); + if (broker != 0) { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent != 0) + { + ManagementObject* mo = queue->GetManagementObject(); + if (mo != 0) + { + management::ObjectId queueId = mo->getObjectId(); + mgmtBinding = new _qmf::Binding + (agent, this, (Manageable*) parent, queueId, key, args); + if (!origin.empty()) + mgmtBinding->set_origin(origin); + agent->addObject (mgmtBinding, agent->allocateId(this)); + } + } } } } diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 47c0bdb3af..e33c0c6bbc 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -121,9 +121,10 @@ protected: public: typedef boost::shared_ptr shared_ptr; - QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0); + QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0, + Broker* broker = 0); QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN virtual ~Exchange(); const std::string& getName() const { return name; } @@ -167,10 +168,12 @@ public: void registerDynamicBridge(DynamicBridge* db); void removeDynamicBridge(DynamicBridge* db); virtual bool supportsDynamicBinding() { return false; } + Broker* getBroker() const { return broker; } protected: qpid::sys::Mutex bridgeLock; std::vector bridgeVector; + Broker* broker; QPID_BROKER_EXTERN virtual void handleHelloRequest(); void propagateFedOp(const std::string& routingKey, const std::string& tags, diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index bb0eec34ba..85bd65e456 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -45,15 +45,15 @@ pair ExchangeRegistry::declare(const string& name, c Exchange::shared_ptr exchange; if(type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new TopicExchange(name, durable, args, parent, broker)); }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new DirectExchange(name, durable, args, parent, broker)); }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker)); }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker)); }else if (type == ManagementExchange::typeName) { - exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent)); + exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker)); } else{ FunctionMap::iterator i = factory.find(type); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 9edd54f025..34ee173a91 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -45,7 +45,7 @@ class ExchangeRegistry{ typedef boost::function4 FactoryFunction; - ExchangeRegistry () : parent(0) {} + ExchangeRegistry (Broker* b = 0) : parent(0), broker(b) {} QPID_BROKER_EXTERN std::pair declare (const std::string& name, const std::string& type); QPID_BROKER_EXTERN std::pair declare @@ -84,7 +84,7 @@ class ExchangeRegistry{ FunctionMap factory; mutable qpid::sys::RWlock lock; management::Manageable* parent; - + Broker* broker; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index aa1f7ff30a..dc3bda4262 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -38,16 +38,16 @@ const std::string fedOpReorigin("R"); const std::string fedOpHello("H"); } -FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent) : - Exchange(_name, _parent) +FanOutExchange::FanOutExchange(const std::string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index edfc4395f4..32da9fe5b5 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -40,11 +40,11 @@ class FanOutExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN FanOutExchange(const std::string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN FanOutExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 09fb2d9bef..4b1176d560 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -43,16 +43,16 @@ namespace { const std::string empty; } -HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) : - Exchange(_name, _parent) +HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent, Broker* b) : + Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 2b01f9ecae..87633c0f0e 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -61,11 +61,11 @@ class HeadersExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN HeadersExchange(const string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN HeadersExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index dd1a1fa0b4..a2717bfd4c 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -68,9 +68,9 @@ Link::Link(LinkRegistry* _links, connection(0), agent(0) { - if (parent != 0) + if (parent != 0 && broker != 0) { - agent = ManagementAgent::Singleton::getInstance(); + agent = broker->getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable); @@ -347,7 +347,7 @@ void Link::notifyConnectionForced(const string text) void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject(mgmtObject, id); } persistenceId = id; diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 39014b0ec0..0b504651eb 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -30,7 +30,7 @@ #include "qpid/sys/Mutex.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Link.h" #include diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index aa0cd8ca31..6930275361 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -30,7 +30,7 @@ #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" @@ -48,7 +48,6 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -80,7 +79,8 @@ const int ENQUEUE_AND_DEQUEUE=2; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, - Manageable* parent) : + Manageable* parent, + Broker* b) : name(_name), autodelete(_autodelete), @@ -98,11 +98,12 @@ Queue::Queue(const string& _name, bool _autodelete, mgmtObject(0), eventMode(0), eventMgr(0), - insertSeqNo(0) + insertSeqNo(0), + broker(b) { - if (parent != 0) + if (parent != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { @@ -111,8 +112,7 @@ Queue::Queue(const string& _name, bool _autodelete, // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. if (store == 0) { - ManagementBroker* mb = dynamic_cast(agent); - agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtObject, agent->allocateId(this)); } } } @@ -838,7 +838,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); if (externalQueueStore) { diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index c5ef9a9307..0d5f2043d1 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -105,6 +105,7 @@ namespace qpid { QueueEvents* eventMgr; bool insertSeqNo; std::string seqNoKey; + Broker* broker; void push(boost::intrusive_ptr& msg, bool isRecovery=false); void setPolicy(std::auto_ptr policy); @@ -158,7 +159,8 @@ namespace qpid { bool autodelete = false, MessageStore* const store = 0, const OwnershipToken* const owner = 0, - management::Manageable* parent = 0); + management::Manageable* parent = 0, + Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index d079e543c4..60182e1ead 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -27,8 +27,8 @@ using namespace qpid::broker; using namespace qpid::sys; -QueueRegistry::QueueRegistry() : - counter(1), store(0), events(0), parent(0), lastNode(false) {} +QueueRegistry::QueueRegistry(Broker* b) : + counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {} QueueRegistry::~QueueRegistry(){} @@ -42,7 +42,7 @@ QueueRegistry::declare(const string& declareName, bool durable, QueueMap::iterator i = queues.find(name); if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent)); + Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); if (events) queue->setQueueEventManager(*events); diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 3c02afedc4..a4ea65f18c 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -43,7 +43,7 @@ class QueueEvents; */ class QueueRegistry { public: - QPID_BROKER_EXTERN QueueRegistry(); + QPID_BROKER_EXTERN QueueRegistry(Broker* b = 0); QPID_BROKER_EXTERN ~QueueRegistry(); /** @@ -131,6 +131,7 @@ private: QueueEvents* events; management::Manageable* parent; bool lastNode; //used to set mode on queue declare + Broker* broker; //destroy impl that assumes lock is already held: void destroyLH (const string& name); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 96c47085f0..0ddd546a68 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -23,7 +23,7 @@ #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/agent/ManagementAgent.h" +#include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -98,7 +98,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const checkAlternate(response.first, alternate); } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, alternateExchange, durable, false, args, @@ -140,7 +140,7 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDelete(getConnection().getUrl(), getConnection().getUserId(), name)); } @@ -181,7 +181,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); } @@ -214,7 +214,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, if (exchange->isDurable() && queue->isDurable()) getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnbind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, routingKey)); } @@ -372,7 +372,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } } - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), name, durable, exclusive, autoDelete, arguments, @@ -422,7 +422,7 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse getBroker().getQueues().destroy(queue); q->unbind(getBroker().getExchanges(), q); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDelete(getConnection().getUrl(), getConnection().getUserId(), queue)); } @@ -484,7 +484,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), queueName, destination, exclusive, arguments)); @@ -495,7 +495,7 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) { state.cancel(destination); - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 7e5f605753..26a35f4a4f 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -32,7 +32,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/AMQP_ClientProxy.h" #include @@ -45,7 +45,6 @@ using namespace framing; using sys::Mutex; using boost::intrusive_ptr; using qpid::management::ManagementAgent; -using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; @@ -73,7 +72,7 @@ SessionState::SessionState( } Manageable* parent = broker.GetVhostObject (); if (parent != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = getBroker().getManagementAgent(); if (agent != 0) { mgmtObject = new _qmf::Session (agent, this, parent, getId().getName()); @@ -81,8 +80,7 @@ SessionState::SessionState( mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); - ManagementBroker* mb = dynamic_cast(agent); - agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); + agent->addObject (mgmtObject, agent->allocateId(this)); } } attach(h); diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index a11ad25bbe..86933109a1 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -18,7 +18,8 @@ // #include "System.h" -#include "qpid/agent/ManagementAgent.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/SystemInfo.h" #include @@ -29,9 +30,9 @@ using namespace qpid::broker; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -System::System (string _dataDir) : mgmtObject(0) +System::System (string _dataDir, Broker* broker) : mgmtObject(0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker ? broker->getManagementAgent() : 0; if (agent != 0) { diff --git a/cpp/src/qpid/broker/System.h b/cpp/src/qpid/broker/System.h index 42a816e095..0fc2c2bd88 100644 --- a/cpp/src/qpid/broker/System.h +++ b/cpp/src/qpid/broker/System.h @@ -28,6 +28,8 @@ namespace qpid { namespace broker { +class Broker; + class System : public management::Manageable { private: @@ -38,7 +40,7 @@ class System : public management::Manageable typedef boost::shared_ptr shared_ptr; - System (std::string _dataDir); + System (std::string _dataDir, Broker* broker = 0); management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index d4f9721162..a465c35790 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -137,15 +137,15 @@ bool TopicPattern::match(const Tokens& target) const return do_match(begin(), end(), target.begin(), target.end()); } -TopicExchange::TopicExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent) +TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) : Exchange(_name, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); } TopicExchange::TopicExchange(const std::string& _name, bool _durable, - const FieldTable& _args, Manageable* _parent) : - Exchange(_name, _durable, _args, _parent) + const FieldTable& _args, Manageable* _parent, Broker* b) : + Exchange(_name, _durable, _args, _parent, b) { if (mgmtExchange != 0) mgmtExchange->set_type (typeName); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 24bf5f7bca..b3ee1ea66d 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -86,11 +86,11 @@ class TopicExchange : public virtual Exchange { static const std::string typeName; QPID_BROKER_EXTERN TopicExchange(const string& name, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN TopicExchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - management::Manageable* parent = 0); + management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } diff --git a/cpp/src/qpid/broker/Vhost.cpp b/cpp/src/qpid/broker/Vhost.cpp index c5bb6c5104..aa7683d318 100644 --- a/cpp/src/qpid/broker/Vhost.cpp +++ b/cpp/src/qpid/broker/Vhost.cpp @@ -18,7 +18,8 @@ // #include "Vhost.h" -#include "qpid/agent/ManagementAgent.h" +#include "Broker.h" +#include "qpid/management/ManagementAgent.h" using namespace qpid::broker; using qpid::management::ManagementAgent; @@ -28,11 +29,11 @@ namespace qpid { namespace management { class Manageable; }} -Vhost::Vhost (qpid::management::Manageable* parentBroker) : mgmtObject(0) +Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0) { - if (parentBroker != 0) + if (parentBroker != 0 && broker != 0) { - ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); + ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { diff --git a/cpp/src/qpid/broker/Vhost.h b/cpp/src/qpid/broker/Vhost.h index ef59362e4d..9554d641c2 100644 --- a/cpp/src/qpid/broker/Vhost.h +++ b/cpp/src/qpid/broker/Vhost.h @@ -27,6 +27,7 @@ namespace qpid { namespace broker { +class Broker; class Vhost : public management::Manageable { private: @@ -37,7 +38,7 @@ class Vhost : public management::Manageable typedef boost::shared_ptr shared_ptr; - Vhost (management::Manageable* parentBroker); + Vhost (management::Manageable* parentBroker, Broker* broker = 0); management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 677bd2b722..1f39fe9ae9 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -44,7 +44,7 @@ #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/memory.h" #include "qpid/sys/Thread.h" #include "qpid/sys/LatencyTracker.h" @@ -116,7 +116,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), error(*this) { - mAgent = ManagementAgent::Singleton::getInstance(); + mAgent = broker.getManagementAgent(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 66d15fa56b..56c50eafae 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -31,7 +31,7 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/log/Statement.h" -#include "qpid/management/ManagementBroker.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/management/IdAllocator.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/Queue.h" @@ -49,7 +49,6 @@ using namespace std; using broker::Broker; using management::IdAllocator; using management::ManagementAgent; -using management::ManagementBroker; /** Note separating options from settings to work around boost version differences. @@ -140,7 +139,7 @@ struct ClusterPlugin : public Plugin { broker->setConnectionFactory( boost::shared_ptr( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); - ManagementBroker* mgmt = dynamic_cast(ManagementAgent::Singleton::getInstance()); + ManagementAgent* mgmt = broker->getManagementAgent(); if (mgmt) { std::auto_ptr allocator(new UpdateClientIdAllocator()); mgmt->setAllocator(allocator); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp new file mode 100644 index 0000000000..77277070d9 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -0,0 +1,1131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "ManagementObject.h" +#include "IdAllocator.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/ConnectionState.h" +#include "qpid/broker/AclModule.h" +#include +#include +#include + +using boost::intrusive_ptr; +using qpid::framing::Uuid; +using namespace qpid::framing; +using namespace qpid::management; +using namespace qpid::broker; +using namespace qpid::sys; +using namespace std; +namespace _qmf = qmf::org::apache::qpid::broker; + +ManagementAgent::RemoteAgent::~RemoteAgent () +{ + if (mgmtObject != 0) + mgmtObject->resourceDestroy(); +} + +ManagementAgent::ManagementAgent () : + threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) +{ + nextObjectId = 1; + brokerBank = 1; + bootSequence = 1; + nextRemoteBank = 10; + nextRequestSequence = 1; + clientWasAdded = false; +} + +ManagementAgent::~ManagementAgent () +{ + timer.stop(); + { + Mutex::ScopedLock lock (userLock); + + // Reset the shared pointers to exchanges. If this is not done now, the exchanges + // will stick around until dExchange and mExchange are implicitely destroyed (long + // after this destructor completes). Those exchanges hold references to management + // objects that will be invalid. + dExchange.reset(); + mExchange.reset(); + + moveNewObjectsLH(); + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + delete object; + } + managementObjects.clear(); + } +} + +void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, + qpid::broker::Broker* _broker, int _threads) +{ + dataDir = _dataDir; + interval = _interval; + broker = _broker; + threadPoolSize = _threads; + ManagementObject::maxThreads = threadPoolSize; + timer.add (intrusive_ptr (new Periodic(*this, interval))); + + // Get from file or generate and save to file. + if (dataDir.empty()) + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " + << uuid); + } + else + { + string filename(dataDir + "/.mbrokerdata"); + ifstream inFile(filename.c_str ()); + + if (inFile.good()) + { + inFile >> uuid; + inFile >> bootSequence; + inFile >> nextRemoteBank; + inFile.close(); + QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid); + + // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. + bootSequence++; + if (bootSequence & 0xF000) + bootSequence = 1; + writeData(); + } + else + { + uuid.generate(); + QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); + writeData(); + } + + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); + } +} + +void ManagementAgent::writeData () +{ + string filename (dataDir + "/.mbrokerdata"); + ofstream outFile (filename.c_str ()); + + if (outFile.good()) + { + outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; + outFile.close(); + } +} + +void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, + qpid::broker::Exchange::shared_ptr _dexchange) +{ + mExchange = _mexchange; + dExchange = _dexchange; +} + +void ManagementAgent::registerClass (const string& packageName, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); +} + +void ManagementAgent::registerEvent (const string& packageName, + const string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); +} + +ObjectId ManagementAgent::addObject (ManagementObject* object, + uint64_t persistId) +{ + Mutex::ScopedLock lock (addLock); + uint16_t sequence; + uint64_t objectNum; + + if (persistId == 0) { + sequence = bootSequence; + objectNum = nextObjectId++; + } else { + sequence = 0; + objectNum = persistId; + } + + ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); + + object->setObjectId(objId); + newManagementObjects[objId] = object; + return objId; +} + +void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) +{ + Mutex::ScopedLock lock (userLock); + Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; + + encodeHeader(outBuffer, 'e'); + outBuffer.putShortString(event.getPackageName()); + outBuffer.putShortString(event.getEventName()); + outBuffer.putBin128(event.getMd5Sum()); + outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putOctet(sev); + event.encode(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, + "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {} + +ManagementAgent::Periodic::~Periodic () {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (intrusive_ptr (new Periodic (agent, agent.interval))); + agent.periodicProcessing (); +} + +void ManagementAgent::clientAdded (const std::string& routingKey) +{ + if (routingKey.find("console") != 0) + return; + + clientWasAdded = true; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) { + char localBuffer[16]; + Buffer outBuffer(localBuffer, 16); + uint32_t outLen; + + encodeHeader(outBuffer, 'x'); + outLen = outBuffer.getPosition(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey); + } +} + +void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet ('A'); + buf.putOctet ('M'); + buf.putOctet ('2'); + buf.putOctet (opcode); + buf.putLong (seq); +} + +bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '2'; +} + +void ManagementAgent::sendBuffer(Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey) +{ + if (exchange.get() == 0) + return; + + intrusive_ptr msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); + AMQFrame header((AMQHeaderBody())); + AMQFrame content((AMQContentBody())); + + content.castBody()->decode(buf, length); + + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = + msg->getFrames().getHeaders()->get(true); + props->setContentLength(length); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + try { + exchange->route(deliverable, routingKey, 0); + } catch(exception&) {} +} + +void ManagementAgent::moveNewObjectsLH() +{ + Mutex::ScopedLock lock (addLock); + for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); + iter != newManagementObjects.end (); + iter++) + managementObjects[iter->first] = iter->second; + newManagementObjects.clear(); +} + +void ManagementAgent::periodicProcessing (void) +{ +#define BUFSIZE 65536 + Mutex::ScopedLock lock (userLock); + char msgChars[BUFSIZE]; + uint32_t contentSize; + string routingKey; + list > deleteList; + + uint64_t uptime = uint64_t(Duration(now())) - startTime; + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + + moveNewObjectsLH(); + + if (clientWasAdded) { + clientWasAdded = false; + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + object->setForcePublish(true); + } + } + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + ManagementObject* object = iter->second; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { + Buffer msgBuffer (msgChars, BUFSIZE); + encodeHeader (msgBuffer, 'c'); + object->writeProperties(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { + Buffer msgBuffer (msgChars, BUFSIZE); + encodeHeader (msgBuffer, 'i'); + object->writeStatistics(msgBuffer); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } + + if (object->isDeleted()) + deleteList.push_back(pair(iter->first, object)); + object->setForcePublish(false); + } + + // Delete flagged objects + for (list >::reverse_iterator iter = deleteList.rbegin(); + iter != deleteList.rend(); + iter++) { + delete iter->second; + managementObjects.erase(iter->first); + } + + if (!deleteList.empty()) { + deleteList.clear(); + deleteOrphanedAgentsLH(); + } + + { + Buffer msgBuffer(msgChars, BUFSIZE); + encodeHeader(msgBuffer, 'h'); + msgBuffer.putLongLong(uint64_t(Duration(now()))); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + routingKey = "console.heartbeat.1.0"; + sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + } +} + +void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence, + uint32_t code, string text) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'z', sequence); + outBuffer.putLong (code); + outBuffer.putShortString (text); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +bool ManagementAgent::dispatchCommand (Deliverable& deliverable, + const string& routingKey, + const FieldTable* /*args*/) +{ + Mutex::ScopedLock lock (userLock); + Message& msg = ((DeliverableMessage&) deliverable).getMessage (); + + // Parse the routing key. This management broker should act as though it + // is bound to the exchange to match the following keys: + // + // agent.1.0.# + // broker + // schema.# + + if (routingKey == "broker") { + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 9, "agent.1.0") == 0) { + dispatchAgentCommandLH(msg); + return false; + } + + else if (routingKey.compare(0, 8, "agent.1.") == 0) { + return authorizeAgentMessageLH(msg); + } + + else if (routingKey.compare(0, 7, "schema.") == 0) { + dispatchAgentCommandLH(msg); + return true; + } + + return true; +} + +void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, + uint32_t sequence, const ConnectionToken* connToken) +{ + string methodName; + string packageName; + string className; + uint8_t hash[16]; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + AclModule* acl = broker->getAcl(); + + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + encodeHeader(outBuffer, 'm', sequence); + + if (acl != 0) { + string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); + map params; + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + return; + } + } + + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter == managementObjects.end() || iter->second->isDeleted()) { + outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); + } else { + if ((iter->second->getPackageName() != packageName) || + (iter->second->getClassName() != className)) { + outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); + outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); + } + else + try { + outBuffer.record(); + Mutex::ScopedUnlock u(userLock); + iter->second->doMethod(methodName, inBuffer, outBuffer); + } catch(exception& e) { + outBuffer.restore(); + outBuffer.putLong(Manageable::STATUS_EXCEPTION); + outBuffer.putMediumString(e.what()); + } + } + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) +{ + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'b', sequence); + uuid.encode (outBuffer); + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) +{ + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + } + + sendCommandComplete (replyToKey, sequence); +} + +void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) +{ + string packageName; + + inBuffer.getShortString(packageName); + findOrAddPackageLH(packageName); +} + +void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + + inBuffer.getShortString(packageName); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) + { + if (cIter->second.hasSchema()) + { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + } + } + sendCommandComplete(replyToKey, sequence); +} + +void ManagementAgent::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) +{ + string packageName; + SchemaClassKey key; + + uint8_t kind = inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + + PackageMap::iterator pIter = findOrAddPackageLH(packageName); + ClassMap::iterator cIter = pIter->second.find(key); + if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + uint32_t sequence = nextRequestSequence++; + + encodeHeader (outBuffer, 'S', sequence); + outBuffer.putShortString(packageName); + outBuffer.putShortString(key.name); + outBuffer.putBin128(key.hash); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); + + if (cIter != pIter->second.end()) + pIter->second.erase(key); + + pIter->second.insert(pair(key, SchemaClass(kind, sequence))); + } +} + +void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) +{ + // If the management package is attached locally (embedded in the broker or + // linked in via plug-in), call the schema handler directly. If the package + // is from a remote management agent, send the stored schema information. + + if (writeSchemaCall != 0) + writeSchemaCall(buf); + else + buf.putRawData(buffer, bufferLen); +} + +void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.getShortString (packageName); + inBuffer.getShortString (key.name); + inBuffer.getBin128 (key.hash); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + SchemaClass& classInfo = cIter->second; + + if (classInfo.hasSchema()) { + encodeHeader(outBuffer, 's', sequence); + classInfo.appendSchema(outBuffer); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); + } + else + sendCommandComplete(replyToKey, sequence, 1, "Package not found"); +} + +void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) +{ + string packageName; + SchemaClassKey key; + + inBuffer.record(); + inBuffer.getOctet(); + inBuffer.getShortString(packageName); + inBuffer.getShortString(key.name); + inBuffer.getBin128(key.hash); + inBuffer.restore(); + + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) { + ClassMap& cMap = pIter->second; + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { + size_t length = validateSchema(inBuffer, cIter->second.kind); + if (length == 0) { + QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); + cMap.erase(key); + } else { + cIter->second.buffer = (uint8_t*) malloc(length); + cIter->second.bufferLen = length; + inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); + + // Publish a class-indication message + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'q'); + encodeClassIndication(outBuffer, pIter, cIter); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, mExchange, "schema.class"); + } + } + } +} + +bool ManagementAgent::bankInUse (uint32_t bank) +{ + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); + aIter != remoteAgents.end(); + aIter++) + if (aIter->second->agentBank == bank) + return true; + return false; +} + +uint32_t ManagementAgent::allocateNewBank () +{ + while (bankInUse (nextRemoteBank)) + nextRemoteBank++; + + uint32_t allocated = nextRemoteBank++; + writeData (); + return allocated; +} + +uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) +{ + if (requestedBank == 0 || bankInUse (requestedBank)) + return allocateNewBank (); + return requestedBank; +} + +void ManagementAgent::deleteOrphanedAgentsLH() +{ + vector deleteList; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + ObjectId connectionRef = aIter->first; + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + if (iter->first == connectionRef && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) { + deleteList.push_back(connectionRef); + delete aIter->second; + } + } + + for (vector::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) + remoteAgents.erase(*dIter); + + deleteList.clear(); +} + +void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) +{ + string label; + uint32_t requestedBrokerBank, requestedAgentBank; + uint32_t assignedBank; + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); + Uuid systemId; + + moveNewObjectsLH(); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { + // There already exists an agent on this session. Reject the request. + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); + return; + } + + inBuffer.getShortString(label); + systemId.decode(inBuffer); + requestedBrokerBank = inBuffer.getLong(); + requestedAgentBank = inBuffer.getLong(); + assignedBank = assignBankLH(requestedAgentBank); + + RemoteAgent* agent = new RemoteAgent; + agent->brokerBank = brokerBank; + agent->agentBank = assignedBank; + agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; + agent->mgmtObject = new _qmf::Agent (this, agent); + agent->mgmtObject->set_connectionRef(agent->connectionRef); + agent->mgmtObject->set_label (label); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); + agent->mgmtObject->set_systemId (systemId); + agent->mgmtObject->set_brokerBank (brokerBank); + agent->mgmtObject->set_agentBank (assignedBank); + addObject (agent->mgmtObject); + + remoteAgents[connectionRef] = agent; + + // Send an Attach Response + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'a', sequence); + outBuffer.putLong (brokerBank); + outBuffer.putLong (assignedBank); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, dExchange, replyToKey); +} + +void ManagementAgent::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +{ + FieldTable ft; + FieldTable::ValuePtr value; + + moveNewObjectsLH(); + + ft.decode(inBuffer); + value = ft.get("_class"); + if (value.get() == 0 || !value->convertsTo()) { + value = ft.get("_objectid"); + if (value.get() == 0 || !value->convertsTo()) + return; + + ObjectId selector(value->get()); + ManagementObjectMap::iterator iter = managementObjects.find(selector); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + sendCommandComplete(replyToKey, sequence); + return; + } + + string className (value->get()); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName () == className) { + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + encodeHeader(outBuffer, 'g', sequence); + object->writeProperties(outBuffer); + object->writeStatistics(outBuffer, true); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + } + + sendCommandComplete(replyToKey, sequence); +} + +bool ManagementAgent::authorizeAgentMessageLH(Message& msg) +{ + Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + if (msg.encodedSize() > MA_BUFFER_SIZE) + return false; + + msg.encodeContent(inBuffer); + inBuffer.reset(); + + if (!checkHeader(inBuffer, &opcode, &sequence)) + return false; + + if (opcode == 'M') { + // TODO: check method call against ACL list. + AclModule* acl = broker->getAcl(); + if (acl == 0) + return true; + + string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); + string packageName; + string className; + uint8_t hash[16]; + string methodName; + + map params; + ObjectId objId(inBuffer); + inBuffer.getShortString(packageName); + inBuffer.getShortString(className); + inBuffer.getBin128(hash); + inBuffer.getShortString(methodName); + + params[acl::PROP_SCHEMAPACKAGE] = packageName; + params[acl::PROP_SCHEMACLASS] = className; + + if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) + return true; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'm', sequence); + outBuffer.putLong(Manageable::STATUS_FORBIDDEN); + outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBuffer(outBuffer, outLen, dExchange, replyToKey); + } + + return false; + } + + return true; +} + +void ManagementAgent::dispatchAgentCommandLH(Message& msg) +{ + Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + uint8_t opcode; + uint32_t sequence; + string replyToKey; + + const framing::MessageProperties* p = + msg.getFrames().getHeaders()->get(); + if (p && p->hasReplyTo()) { + const framing::ReplyTo& rt = p->getReplyTo(); + replyToKey = rt.getRoutingKey(); + } + else + return; + + if (msg.encodedSize() > MA_BUFFER_SIZE) { + QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << + msg.encodedSize()); + return; + } + + msg.encodeContent(inBuffer); + uint32_t bufferLen = inBuffer.getPosition(); + inBuffer.reset(); + + while (inBuffer.getPosition() < bufferLen) { + if (!checkHeader(inBuffer, &opcode, &sequence)) + return; + + if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + } +} + +ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) +{ + PackageMap::iterator pIter = packages.find (name); + if (pIter != packages.end ()) + return pIter; + + // No such package found, create a new map entry. + pair result = + packages.insert(pair(name, ClassMap())); + QPID_LOG (debug, "ManagementAgent added package " << name); + + // Publish a package-indication message + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader (outBuffer, 'p'); + encodePackageIndication (outBuffer, result.first); + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBuffer (outBuffer, outLen, mExchange, "schema.package"); + + return result.first; +} + +void ManagementAgent::addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall) +{ + SchemaClassKey key; + ClassMap& cMap = pIter->second; + + key.name = className; + memcpy(&key.hash, md5Sum, 16); + + ClassMap::iterator cIter = cMap.find(key); + if (cIter != cMap.end()) + return; + + // No such class found, create a new class with local information. + QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << + key.name); + + cMap.insert(pair(key, SchemaClass(kind, schemaCall))); + cIter = cMap.find(key); +} + +void ManagementAgent::encodePackageIndication(Buffer& buf, + PackageMap::iterator pIter) +{ + buf.putShortString((*pIter).first); +} + +void ManagementAgent::encodeClassIndication(Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter) +{ + SchemaClassKey key = (*cIter).first; + + buf.putOctet((*cIter).second.kind); + buf.putShortString((*pIter).first); + buf.putShortString(key.name); + buf.putBin128(key.hash); +} + +size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) +{ + if (kind == ManagementItem::CLASS_KIND_TABLE) + return validateTableSchema(inBuffer); + else if (kind == ManagementItem::CLASS_KIND_EVENT) + return validateEventSchema(inBuffer); + return 0; +} + +size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_TABLE) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t propCount = inBuffer.getShort(); + uint16_t statCount = inBuffer.getShort(); + uint16_t methCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < propCount + statCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + + for (uint16_t idx = 0; idx < methCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + if (!ft.isSet("argCount")) + return 0; + int argCount = ft.getAsInt("argCount"); + for (int mIdx = 0; mIdx < argCount; mIdx++) { + FieldTable aft; + aft.decode(inBuffer); + } + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} + +size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) +{ + uint32_t start = inBuffer.getPosition(); + uint32_t end; + string text; + uint8_t hash[16]; + + try { + inBuffer.record(); + uint8_t kind = inBuffer.getOctet(); + if (kind != ManagementItem::CLASS_KIND_EVENT) + return 0; + + inBuffer.getShortString(text); + inBuffer.getShortString(text); + inBuffer.getBin128(hash); + + uint16_t argCount = inBuffer.getShort(); + + for (uint16_t idx = 0; idx < argCount; idx++) { + FieldTable ft; + ft.decode(inBuffer); + } + } catch (exception& /*e*/) { + return 0; + } + + end = inBuffer.getPosition(); + inBuffer.restore(); // restore original position + return end - start; +} + +void ManagementAgent::setAllocator(std::auto_ptr a) +{ + Mutex::ScopedLock lock (addLock); + allocator = a; +} + +uint64_t ManagementAgent::allocateId(Manageable* object) +{ + Mutex::ScopedLock lock (addLock); + if (allocator.get()) return allocator->getIdFor(object); + return 0; +} diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..2411e6c277 --- /dev/null +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,248 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "qpid/broker/ConnectionToken.h" +#include "ManagementObject.h" +#include "ManagementEvent.h" +#include "Manageable.h" +#include "qmf/org/apache/qpid/broker/Agent.h" +#include +#include + +namespace qpid { +namespace management { + +struct IdAllocator; + +class ManagementAgent +{ +private: + + int threadPoolSize; + +public: + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + ManagementAgent (); + virtual ~ManagementAgent (); + + void configure (const std::string& dataDir, uint16_t interval, + qpid::broker::Broker* broker, int threadPoolSize); + void setInterval (uint16_t _interval) { interval = _interval; } + void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + int getMaxThreads () { return threadPoolSize; } + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + + bool dispatchCommand (qpid::broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args); + + const framing::Uuid& getUuid() const { return uuid; } + + void setAllocator(std::auto_ptr allocator); + uint64_t allocateId(Manageable* object); +private: + struct Periodic : public qpid::broker::TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + virtual ~Periodic (); + void fire (); + }; + + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + uint32_t brokerBank; + uint32_t agentBank; + std::string routingKey; + ObjectId connectionRef; + qmf::org::apache::qpid::broker::Agent* mgmtObject; + ManagementObject* GetManagementObject (void) const { return mgmtObject; } + virtual ~RemoteAgent (); + }; + + // TODO: Eventually replace string with entire reply-to structure. reply-to + // currently assumes that the exchange is "amq.direct" even though it could + // in theory be specified differently. + typedef std::map RemoteAgentMap; + typedef std::vector ReplyToVector; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass + { + uint8_t kind; + ManagementObject::writeSchemaCall_t writeSchemaCall; + uint32_t pendingSequence; + size_t bufferLen; + uint8_t* buffer; + + SchemaClass(uint8_t _kind, uint32_t seq) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } + void appendSchema (framing::Buffer& buf); + }; + + typedef std::map ClassMap; + typedef std::map PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + ManagementObjectMap managementObjects; + ManagementObjectMap newManagementObjects; + + static ManagementAgent* agent; + static bool enabled; + + framing::Uuid uuid; + sys::Mutex addLock; + sys::Mutex userLock; + qpid::broker::Timer timer; + qpid::broker::Exchange::shared_ptr mExchange; + qpid::broker::Exchange::shared_ptr dExchange; + std::string dataDir; + uint16_t interval; + qpid::broker::Broker* broker; + uint16_t bootSequence; + uint32_t nextObjectId; + uint32_t brokerBank; + uint32_t nextRemoteBank; + uint32_t nextRequestSequence; + bool clientWasAdded; + const uint64_t startTime; + + std::auto_ptr allocator; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; + + void writeData (); + void periodicProcessing (void); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendBuffer (framing::Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + std::string routingKey); + void moveNewObjectsLH(); + + bool authorizeAgentMessageLH(qpid::broker::Message& msg); + void dispatchAgentCommandLH(qpid::broker::Message& msg); + + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void encodeClassIndication (framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void deleteOrphanedAgentsLH(); + void sendCommandComplete (std::string replyToKey, uint32_t sequence, + uint32_t code = 0, std::string text = std::string("OK")); + void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); +}; + +}} + +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp deleted file mode 100644 index 19300ef1af..0000000000 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ /dev/null @@ -1,1161 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementBroker.h" -#include "IdAllocator.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/sys/Time.h" -#include "qpid/broker/ConnectionState.h" -#include "qpid/broker/AclModule.h" -#include -#include -#include - -using boost::intrusive_ptr; -using qpid::framing::Uuid; -using namespace qpid::framing; -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::sys; -using namespace std; -namespace _qmf = qmf::org::apache::qpid::broker; - -Mutex ManagementAgent::Singleton::lock; -bool ManagementAgent::Singleton::disabled = false; -ManagementAgent* ManagementAgent::Singleton::agent = 0; -int ManagementAgent::Singleton::refCount = 0; - -ManagementAgent::Singleton::Singleton(bool disableManagement) -{ - Mutex::ScopedLock _lock(lock); - if (disableManagement && !disabled) { - disabled = true; - assert(refCount == 0); // can't disable after agent has been allocated - } - if (refCount == 0 && !disabled) - agent = new ManagementBroker(); - refCount++; -} - -ManagementAgent::Singleton::~Singleton() -{ - Mutex::ScopedLock _lock(lock); - refCount--; - if (refCount == 0 && !disabled) { - delete agent; - agent = 0; - } -} - -ManagementAgent* ManagementAgent::Singleton::getInstance() -{ - return agent; -} - -ManagementBroker::RemoteAgent::~RemoteAgent () -{ - if (mgmtObject != 0) - mgmtObject->resourceDestroy(); -} - -ManagementBroker::ManagementBroker () : - threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now()))) -{ - nextObjectId = 1; - brokerBank = 1; - bootSequence = 1; - nextRemoteBank = 10; - nextRequestSequence = 1; - clientWasAdded = false; -} - -ManagementBroker::~ManagementBroker () -{ - timer.stop(); - { - Mutex::ScopedLock lock (userLock); - - // Reset the shared pointers to exchanges. If this is not done now, the exchanges - // will stick around until dExchange and mExchange are implicitely destroyed (long - // after this destructor completes). Those exchanges hold references to management - // objects that will be invalid. - dExchange.reset(); - mExchange.reset(); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); - } -} - -void ManagementBroker::configure(const string& _dataDir, uint16_t _interval, - qpid::broker::Broker* _broker, int _threads) -{ - dataDir = _dataDir; - interval = _interval; - broker = _broker; - threadPoolSize = _threads; - timer.add (intrusive_ptr (new Periodic(*this, interval))); - - // Get from file or generate and save to file. - if (dataDir.empty()) - { - uuid.generate(); - QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: " - << uuid); - } - else - { - string filename(dataDir + "/.mbrokerdata"); - ifstream inFile(filename.c_str ()); - - if (inFile.good()) - { - inFile >> uuid; - inFile >> bootSequence; - inFile >> nextRemoteBank; - inFile.close(); - QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid); - - // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. - bootSequence++; - if (bootSequence & 0xF000) - bootSequence = 1; - writeData(); - } - else - { - uuid.generate(); - QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid); - writeData(); - } - - QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence); - } -} - -void ManagementBroker::writeData () -{ - string filename (dataDir + "/.mbrokerdata"); - ofstream outFile (filename.c_str ()); - - if (outFile.good()) - { - outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close(); - } -} - -void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchange, - qpid::broker::Exchange::shared_ptr _dexchange) -{ - mExchange = _mexchange; - dExchange = _dexchange; -} - -void ManagementBroker::registerClass (const string& packageName, - const string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); -} - -void ManagementBroker::registerEvent (const string& packageName, - const string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); -} - -ObjectId ManagementBroker::addObject (ManagementObject* object, - uint64_t persistId) -{ - Mutex::ScopedLock lock (addLock); - uint16_t sequence; - uint64_t objectNum; - - if (persistId == 0) { - sequence = bootSequence; - objectNum = nextObjectId++; - } else { - sequence = 0; - objectNum = persistId; - } - - ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum); - - object->setObjectId(objId); - newManagementObjects[objId] = object; - return objId; -} - -void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t severity) -{ - Mutex::ScopedLock lock (userLock); - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); - outBuffer.putOctet(sev); - event.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); -} - -ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds) - : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {} - -ManagementBroker::Periodic::~Periodic () {} - -void ManagementBroker::Periodic::fire () -{ - broker.timer.add (intrusive_ptr (new Periodic (broker, broker.interval))); - broker.periodicProcessing (); -} - -void ManagementBroker::clientAdded (const std::string& routingKey) -{ - if (routingKey.find("console") != 0) - return; - - clientWasAdded = true; - for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); - aIter != remoteAgents.end(); - aIter++) { - char localBuffer[16]; - Buffer outBuffer(localBuffer, 16); - uint32_t outLen; - - encodeHeader(outBuffer, 'x'); - outLen = outBuffer.getPosition(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, aIter->second->routingKey); - } -} - -void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('2'); - buf.putOctet (opcode); - buf.putLong (seq); -} - -bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '2'; -} - -void ManagementBroker::sendBuffer(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) -{ - if (exchange.get() == 0) - return; - - intrusive_ptr msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - - content.castBody()->decode(buf, length); - - method.setEof(false); - header.setBof(false); - header.setEof(false); - content.setBof(false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = - msg->getFrames().getHeaders()->get(true); - props->setContentLength(length); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - try { - exchange->route(deliverable, routingKey, 0); - } catch(exception&) {} -} - -void ManagementBroker::moveNewObjectsLH() -{ - Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); - iter++) - managementObjects[iter->first] = iter->second; - newManagementObjects.clear(); -} - -void ManagementBroker::periodicProcessing (void) -{ -#define BUFSIZE 65536 - Mutex::ScopedLock lock (userLock); - char msgChars[BUFSIZE]; - uint32_t contentSize; - string routingKey; - list > deleteList; - - uint64_t uptime = uint64_t(Duration(now())) - startTime; - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - - moveNewObjectsLH(); - - if (clientWasAdded) { - clientWasAdded = false; - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - object->setForcePublish(true); - } - } - - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) { - Buffer msgBuffer (msgChars, BUFSIZE); - encodeHeader (msgBuffer, 'c'); - object->writeProperties(msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { - Buffer msgBuffer (msgChars, BUFSIZE); - encodeHeader (msgBuffer, 'i'); - object->writeStatistics(msgBuffer); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } - - if (object->isDeleted()) - deleteList.push_back(pair(iter->first, object)); - object->setForcePublish(false); - } - - // Delete flagged objects - for (list >::reverse_iterator iter = deleteList.rbegin(); - iter != deleteList.rend(); - iter++) { - delete iter->second; - managementObjects.erase(iter->first); - } - - if (!deleteList.empty()) { - deleteList.clear(); - deleteOrphanedAgentsLH(); - } - - { - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.heartbeat.1.0"; - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); - } -} - -void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, - uint32_t code, string text) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'z', sequence); - outBuffer.putLong (code); - outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -bool ManagementBroker::dispatchCommand (Deliverable& deliverable, - const string& routingKey, - const FieldTable* /*args*/) -{ - Mutex::ScopedLock lock (userLock); - Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - - // Parse the routing key. This management broker should act as though it - // is bound to the exchange to match the following keys: - // - // agent.1.0.# - // broker - // schema.# - - if (routingKey == "broker") { - dispatchAgentCommandLH(msg); - return false; - } - - else if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); - return false; - } - - else if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); - } - - else if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); - return true; - } - - return true; -} - -void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, - uint32_t sequence, const ConnectionToken* connToken) -{ - string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - AclModule* acl = broker->getAcl(); - - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - encodeHeader(outBuffer, 'm', sequence); - - if (acl != 0) { - string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); - map params; - params[acl::PROP_SCHEMAPACKAGE] = packageName; - params[acl::PROP_SCHEMACLASS] = className; - - if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - return; - } - } - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER)); - } - else - try { - outBuffer.record(); - Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inBuffer, outBuffer); - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); - } - } - - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'b', sequence); - uuid.encode (outBuffer); - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence) -{ - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) - { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'p', sequence); - encodePackageIndication (outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); - } - - sendCommandComplete (replyToKey, sequence); -} - -void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/) -{ - string packageName; - - inBuffer.getShortString(packageName); - findOrAddPackageLH(packageName); -} - -void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string packageName; - - inBuffer.getShortString(packageName); - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) - { - ClassMap cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); - cIter != cMap.end(); - cIter++) - { - if (cIter->second.hasSchema()) - { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - } - } - } - sendCommandComplete(replyToKey, sequence); -} - -void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t) -{ - string packageName; - SchemaClassKey key; - - uint8_t kind = inBuffer.getOctet(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(key.name); - inBuffer.getBin128(key.hash); - - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - ClassMap::iterator cIter = pIter->second.find(key); - if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - uint32_t sequence = nextRequestSequence++; - - encodeHeader (outBuffer, 'S', sequence); - outBuffer.putShortString(packageName); - outBuffer.putShortString(key.name); - outBuffer.putBin128(key.hash); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); - - if (cIter != pIter->second.end()) - pIter->second.erase(key); - - pIter->second.insert(pair(key, SchemaClass(kind, sequence))); - } -} - -void ManagementBroker::SchemaClass::appendSchema(Buffer& buf) -{ - // If the management package is attached locally (embedded in the broker or - // linked in via plug-in), call the schema handler directly. If the package - // is from a remote management agent, send the stored schema information. - - if (writeSchemaCall != 0) - writeSchemaCall(buf); - else - buf.putRawData(buffer, bufferLen); -} - -void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.getShortString (packageName); - inBuffer.getShortString (key.name); - inBuffer.getBin128 (key.hash); - - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) { - ClassMap& cMap = pIter->second; - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - SchemaClass& classInfo = cIter->second; - - if (classInfo.hasSchema()) { - encodeHeader(outBuffer, 's', sequence); - classInfo.appendSchema(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - } - else - sendCommandComplete(replyToKey, sequence, 1, "Schema not available"); - } - else - sendCommandComplete(replyToKey, sequence, 1, "Class key not found"); - } - else - sendCommandComplete(replyToKey, sequence, 1, "Package not found"); -} - -void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.record(); - inBuffer.getOctet(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(key.name); - inBuffer.getBin128(key.hash); - inBuffer.restore(); - - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) { - ClassMap& cMap = pIter->second; - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { - size_t length = validateSchema(inBuffer, cIter->second.kind); - if (length == 0) { - QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name); - cMap.erase(key); - } else { - cIter->second.buffer = (uint8_t*) malloc(length); - cIter->second.bufferLen = length; - inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen); - - // Publish a class-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, mExchange, "schema.class"); - } - } - } -} - -bool ManagementBroker::bankInUse (uint32_t bank) -{ - for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); - aIter != remoteAgents.end(); - aIter++) - if (aIter->second->agentBank == bank) - return true; - return false; -} - -uint32_t ManagementBroker::allocateNewBank () -{ - while (bankInUse (nextRemoteBank)) - nextRemoteBank++; - - uint32_t allocated = nextRemoteBank++; - writeData (); - return allocated; -} - -uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) -{ - if (requestedBank == 0 || bankInUse (requestedBank)) - return allocateNewBank (); - return requestedBank; -} - -void ManagementBroker::deleteOrphanedAgentsLH() -{ - vector deleteList; - - for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - ObjectId connectionRef = aIter->first; - bool found = false; - - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - if (iter->first == connectionRef && !iter->second->isDeleted()) { - found = true; - break; - } - } - - if (!found) { - deleteList.push_back(connectionRef); - delete aIter->second; - } - } - - for (vector::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) - remoteAgents.erase(*dIter); - - deleteList.clear(); -} - -void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) -{ - string label; - uint32_t requestedBrokerBank, requestedAgentBank; - uint32_t assignedBank; - ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); - Uuid systemId; - - moveNewObjectsLH(); - deleteOrphanedAgentsLH(); - RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); - if (aIter != remoteAgents.end()) { - // There already exists an agent on this session. Reject the request. - sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); - return; - } - - inBuffer.getShortString(label); - systemId.decode(inBuffer); - requestedBrokerBank = inBuffer.getLong(); - requestedAgentBank = inBuffer.getLong(); - assignedBank = assignBankLH(requestedAgentBank); - - RemoteAgent* agent = new RemoteAgent; - agent->brokerBank = brokerBank; - agent->agentBank = assignedBank; - agent->routingKey = replyToKey; - agent->connectionRef = connectionRef; - agent->mgmtObject = new _qmf::Agent (this, agent); - agent->mgmtObject->set_connectionRef(agent->connectionRef); - agent->mgmtObject->set_label (label); - agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId (systemId); - agent->mgmtObject->set_brokerBank (brokerBank); - agent->mgmtObject->set_agentBank (assignedBank); - addObject (agent->mgmtObject); - - remoteAgents[connectionRef] = agent; - - // Send an Attach Response - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'a', sequence); - outBuffer.putLong (brokerBank); - outBuffer.putLong (assignedBank); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, dExchange, replyToKey); -} - -void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) -{ - FieldTable ft; - FieldTable::ValuePtr value; - - moveNewObjectsLH(); - - ft.decode(inBuffer); - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo()) - return; - - ObjectId selector(value->get()); - ManagementObjectMap::iterator iter = managementObjects.find(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - } - sendCommandComplete(replyToKey, sequence); - return; - } - - string className (value->get()); - - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName () == className) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(outBuffer); - object->writeStatistics(outBuffer, true); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - } - } - - sendCommandComplete(replyToKey, sequence); -} - -bool ManagementBroker::authorizeAgentMessageLH(Message& msg) -{ - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; - - if (msg.encodedSize() > MA_BUFFER_SIZE) - return false; - - msg.encodeContent(inBuffer); - inBuffer.reset(); - - if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; - - if (opcode == 'M') { - // TODO: check method call against ACL list. - AclModule* acl = broker->getAcl(); - if (acl == 0) - return true; - - string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); - string packageName; - string className; - uint8_t hash[16]; - string methodName; - - map params; - ObjectId objId(inBuffer); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - - params[acl::PROP_SCHEMAPACKAGE] = packageName; - params[acl::PROP_SCHEMACLASS] = className; - - if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) - return true; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get(); - if (p && p->hasReplyTo()) { - const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); - - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'm', sequence); - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBuffer(outBuffer, outLen, dExchange, replyToKey); - } - - return false; - } - - return true; -} - -void ManagementBroker::dispatchAgentCommandLH(Message& msg) -{ - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - uint32_t sequence; - string replyToKey; - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get(); - if (p && p->hasReplyTo()) { - const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); - } - else - return; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementBroker::dispatchAgentCommandLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent(inBuffer); - uint32_t bufferLen = inBuffer.getPosition(); - inBuffer.reset(); - - while (inBuffer.getPosition() < bufferLen) { - if (!checkHeader(inBuffer, &opcode, &sequence)) - return; - - if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); - } -} - -ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(string name) -{ - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) - return pIter; - - // No such package found, create a new map entry. - pair result = - packages.insert(pair(name, ClassMap())); - QPID_LOG (debug, "ManagementBroker added package " << name); - - // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'p'); - encodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBuffer (outBuffer, outLen, mExchange, "schema.package"); - - return result.first; -} - -void ManagementBroker::addClassLH(uint8_t kind, - PackageMap::iterator pIter, - const string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - SchemaClassKey key; - ClassMap& cMap = pIter->second; - - key.name = className; - memcpy(&key.hash, md5Sum, 16); - - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end()) - return; - - // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementBroker added class " << pIter->first << ":" << - key.name); - - cMap.insert(pair(key, SchemaClass(kind, schemaCall))); - cIter = cMap.find(key); -} - -void ManagementBroker::encodePackageIndication(Buffer& buf, - PackageMap::iterator pIter) -{ - buf.putShortString((*pIter).first); -} - -void ManagementBroker::encodeClassIndication(Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) -{ - SchemaClassKey key = (*cIter).first; - - buf.putOctet((*cIter).second.kind); - buf.putShortString((*pIter).first); - buf.putShortString(key.name); - buf.putBin128(key.hash); -} - -size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind) -{ - if (kind == ManagementItem::CLASS_KIND_TABLE) - return validateTableSchema(inBuffer); - else if (kind == ManagementItem::CLASS_KIND_EVENT) - return validateEventSchema(inBuffer); - return 0; -} - -size_t ManagementBroker::validateTableSchema(Buffer& inBuffer) -{ - uint32_t start = inBuffer.getPosition(); - uint32_t end; - string text; - uint8_t hash[16]; - - try { - inBuffer.record(); - uint8_t kind = inBuffer.getOctet(); - if (kind != ManagementItem::CLASS_KIND_TABLE) - return 0; - - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - - uint16_t propCount = inBuffer.getShort(); - uint16_t statCount = inBuffer.getShort(); - uint16_t methCount = inBuffer.getShort(); - - for (uint16_t idx = 0; idx < propCount + statCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } - - for (uint16_t idx = 0; idx < methCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getAsInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); - } - } - } catch (exception& /*e*/) { - return 0; - } - - end = inBuffer.getPosition(); - inBuffer.restore(); // restore original position - return end - start; -} - -size_t ManagementBroker::validateEventSchema(Buffer& inBuffer) -{ - uint32_t start = inBuffer.getPosition(); - uint32_t end; - string text; - uint8_t hash[16]; - - try { - inBuffer.record(); - uint8_t kind = inBuffer.getOctet(); - if (kind != ManagementItem::CLASS_KIND_EVENT) - return 0; - - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - - uint16_t argCount = inBuffer.getShort(); - - for (uint16_t idx = 0; idx < argCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } - } catch (exception& /*e*/) { - return 0; - } - - end = inBuffer.getPosition(); - inBuffer.restore(); // restore original position - return end - start; -} - -void ManagementBroker::setAllocator(std::auto_ptr a) -{ - Mutex::ScopedLock lock (addLock); - allocator = a; -} - -uint64_t ManagementBroker::allocateId(Manageable* object) -{ - Mutex::ScopedLock lock (addLock); - if (allocator.get()) return allocator->getIdFor(object); - return 0; -} diff --git a/cpp/src/qpid/management/ManagementBroker.h b/cpp/src/qpid/management/ManagementBroker.h deleted file mode 100644 index a57f73be15..0000000000 --- a/cpp/src/qpid/management/ManagementBroker.h +++ /dev/null @@ -1,242 +0,0 @@ -#ifndef _ManagementBroker_ -#define _ManagementBroker_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Mutex.h" -#include "qpid/broker/ConnectionToken.h" -#include "qpid/agent/ManagementAgent.h" -#include "ManagementObject.h" -#include "Manageable.h" -#include "qmf/org/apache/qpid/broker/Agent.h" -#include -#include - -namespace qpid { -namespace management { - -struct IdAllocator; - -class ManagementBroker : public ManagementAgent -{ -private: - - int threadPoolSize; - -public: - ManagementBroker (); - virtual ~ManagementBroker (); - - void configure (const std::string& dataDir, uint16_t interval, - qpid::broker::Broker* broker, int threadPoolSize); - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, - qpid::broker::Exchange::shared_ptr directExchange); - int getMaxThreads () { return threadPoolSize; } - void registerClass (const std::string& packageName, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent (const std::string& packageName, - const std::string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0); - void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); - void clientAdded (const std::string& routingKey); - bool dispatchCommand (qpid::broker::Deliverable& msg, - const std::string& routingKey, - const framing::FieldTable* args); - const framing::Uuid& getUuid() const { return uuid; } - - // Stubs for remote management agent calls - void init(const std::string&, uint16_t, uint16_t, bool, - const std::string&, const std::string&, const std::string&, - const std::string&, const std::string&) { assert(0); } - void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); } - uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } - int getSignalFd () { assert(0); return -1; } - - void setAllocator(std::auto_ptr allocator); - uint64_t allocateId(Manageable* object); -private: - friend class ManagementAgent; - - struct Periodic : public qpid::broker::TimerTask - { - ManagementBroker& broker; - - Periodic (ManagementBroker& broker, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - - // Storage for tracking remote management agents, attached via the client - // management agent API. - // - struct RemoteAgent : public Manageable - { - uint32_t brokerBank; - uint32_t agentBank; - std::string routingKey; - ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - ManagementObject* GetManagementObject (void) const { return mgmtObject; } - virtual ~RemoteAgent (); - }; - - // TODO: Eventually replace string with entire reply-to structure. reply-to - // currently assumes that the exchange is "amq.direct" even though it could - // in theory be specified differently. - typedef std::map RemoteAgentMap; - typedef std::vector ReplyToVector; - - // Storage for known schema classes: - // - // SchemaClassKey -- Key elements for map lookups - // SchemaClassKeyComp -- Comparison class for SchemaClassKey - // SchemaClass -- Non-key elements for classes - // - struct SchemaClassKey - { - std::string name; - uint8_t hash[16]; - }; - - struct SchemaClassKeyComp - { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - struct SchemaClass - { - uint8_t kind; - ManagementObject::writeSchemaCall_t writeSchemaCall; - uint32_t pendingSequence; - size_t bufferLen; - uint8_t* buffer; - - SchemaClass(uint8_t _kind, uint32_t seq) : - kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} - SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : - kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} - bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } - void appendSchema (framing::Buffer& buf); - }; - - typedef std::map ClassMap; - typedef std::map PackageMap; - - RemoteAgentMap remoteAgents; - PackageMap packages; - ManagementObjectMap managementObjects; - ManagementObjectMap newManagementObjects; - - static ManagementAgent* agent; - static bool enabled; - - framing::Uuid uuid; - sys::Mutex addLock; - sys::Mutex userLock; - qpid::broker::Timer timer; - qpid::broker::Exchange::shared_ptr mExchange; - qpid::broker::Exchange::shared_ptr dExchange; - std::string dataDir; - uint16_t interval; - qpid::broker::Broker* broker; - uint16_t bootSequence; - uint32_t nextObjectId; - uint32_t brokerBank; - uint32_t nextRemoteBank; - uint32_t nextRequestSequence; - bool clientWasAdded; - const uint64_t startTime; - - std::auto_ptr allocator; - -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - - void writeData (); - void periodicProcessing (void); - void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBuffer (framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); - void moveNewObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg); - - PackageMap::iterator findOrAddPackageLH(std::string name); - void addClassLH(uint8_t kind, - PackageMap::iterator pIter, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void encodePackageIndication (framing::Buffer& buf, - PackageMap::iterator pIter); - void encodeClassIndication (framing::Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter); - bool bankInUse (uint32_t bank); - uint32_t allocateNewBank (); - uint32_t assignBankLH (uint32_t requestedPrefix); - void deleteOrphanedAgentsLH(); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - - size_t validateSchema(framing::Buffer&, uint8_t kind); - size_t validateTableSchema(framing::Buffer&); - size_t validateEventSchema(framing::Buffer&); -}; - -}} - -#endif /*!_ManagementBroker_*/ diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp index 4dcafbfcdd..0793b2d18c 100644 --- a/cpp/src/qpid/management/ManagementExchange.cpp +++ b/cpp/src/qpid/management/ManagementExchange.cpp @@ -27,14 +27,14 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent) : - Exchange (_name, _parent), TopicExchange(_name, _parent) {} +ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) : + Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {} ManagementExchange::ManagementExchange (const std::string& _name, bool _durable, const FieldTable& _args, - Manageable* _parent) : - Exchange (_name, _durable, _args, _parent), - TopicExchange(_name, _durable, _args, _parent) {} + Manageable* _parent, Broker* b) : + Exchange (_name, _durable, _args, _parent, b), + TopicExchange(_name, _durable, _args, _parent, b) {} void ManagementExchange::route (Deliverable& msg, const string& routingKey, @@ -60,7 +60,7 @@ bool ManagementExchange::bind (Queue::shared_ptr queue, return TopicExchange::bind(queue, routingKey, args); } -void ManagementExchange::setManagmentAgent (ManagementBroker* agent) +void ManagementExchange::setManagmentAgent (ManagementAgent* agent) { managementAgent = agent; } diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h index d54db1a74e..5e51683515 100644 --- a/cpp/src/qpid/management/ManagementExchange.h +++ b/cpp/src/qpid/management/ManagementExchange.h @@ -22,7 +22,7 @@ #define _ManagementExchange_ #include "qpid/broker/TopicExchange.h" -#include "ManagementBroker.h" +#include "ManagementAgent.h" namespace qpid { namespace broker { @@ -30,15 +30,15 @@ namespace broker { class ManagementExchange : public virtual TopicExchange { private: - management::ManagementBroker* managementAgent; + management::ManagementAgent* managementAgent; public: static const std::string typeName; - ManagementExchange (const string& name, Manageable* _parent = 0); + ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0); ManagementExchange (const string& _name, bool _durable, const qpid::framing::FieldTable& _args, - Manageable* _parent = 0); + Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } @@ -50,7 +50,7 @@ class ManagementExchange : public virtual TopicExchange const string& routingKey, const qpid::framing::FieldTable* args); - void setManagmentAgent (management::ManagementBroker* agent); + void setManagmentAgent (management::ManagementAgent* agent); virtual ~ManagementExchange(); }; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index f4c45de126..08008b3d79 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -21,7 +21,6 @@ #include "Manageable.h" #include "ManagementObject.h" -#include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Thread.h" @@ -156,6 +155,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) }} +int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; void ManagementObject::writeTimestamps (framing::Buffer& buf) @@ -176,7 +176,7 @@ int ManagementObject::getThreadIndex() { if (thisIndex == -1) { sys::Mutex::ScopedLock mutex(accessLock); thisIndex = nextThreadIndex; - if (nextThreadIndex < agent->getMaxThreads() - 1) + if (nextThreadIndex < maxThreads - 1) nextThreadIndex++; } return thisIndex; diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h index 498169318d..15c2307886 100644 --- a/cpp/src/qpid/management/ManagementObject.h +++ b/cpp/src/qpid/management/ManagementObject.h @@ -32,7 +32,6 @@ namespace qpid { namespace management { class Manageable; -class ManagementAgent; class ObjectId; @@ -111,7 +110,7 @@ public: class ManagementObject : public ManagementItem { - protected: +protected: uint64_t createTime; uint64_t destroyTime; @@ -122,8 +121,6 @@ class ManagementObject : public ManagementItem bool deleted; Manageable* coreObject; sys::Mutex accessLock; - ManagementAgent* agent; - int maxThreads; uint32_t flags; static int nextThreadIndex; @@ -133,13 +130,14 @@ class ManagementObject : public ManagementItem QPID_COMMON_EXTERN void writeTimestamps(qpid::framing::Buffer& buf); public: + static int maxThreads; typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&); - ManagementObject(ManagementAgent* _agent, Manageable* _core) : + ManagementObject(Manageable* _core) : createTime(uint64_t(qpid::sys::Duration(qpid::sys::now()))), destroyTime(0), updateTime(createTime), configChanged(true), instChanged(true), deleted(false), - coreObject(_core), agent(_agent), forcePublish(false) {} + coreObject(_core), forcePublish(false) {} virtual ~ManagementObject() {} virtual writeSchemaCall_t getWriteSchemaCall() = 0; -- cgit v1.2.1