diff options
Diffstat (limited to 'qpid/cpp/src')
35 files changed, 1118 insertions, 1220 deletions
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index f1f9009568..a681a6d18d 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(options->daemon.transport); ready(port); // Notify parent. if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) { - dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } brokerPtr->run(); } @@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) { uint16_t port = brokerPtr->getPort(myOptions->daemon.transport); cout << port << endl; if (options->broker.enableMgmt) { - dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } } brokerPtr->run(); diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index cd79add720..61e0b56104 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -52,7 +52,7 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::acl; -Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), +Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)), resourceCounter(new ResourceCounter(*this, aclValues.aclMaxQueuesPerUser)){ @@ -60,7 +60,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals if (agent != 0){ _qmf::Package packageInit(agent); - mgmtObject = new _qmf::Acl (agent, this, broker); + mgmtObject = _qmf::Acl::shared_ptr(new _qmf::Acl (agent, this, broker)); agent->addObject (mgmtObject); mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal); mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp); @@ -317,9 +317,9 @@ Acl::~Acl(){ broker->getConnectionObservers().remove(connectionCounter); } -ManagementObject* Acl::GetManagementObject(void) const +ManagementObject::shared_ptr Acl::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args& args, string& text) diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index a23952ff53..28cbfb8f3f 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -62,7 +62,7 @@ private: broker::Broker* broker; bool transferAcl; boost::shared_ptr<AclData> data; - qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle + qmf::org::apache::qpid::acl::Acl::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; mutable qpid::sys::Mutex dataLock; boost::shared_ptr<ConnectionCounter> connectionCounter; @@ -113,7 +113,7 @@ private: bool readAclFile(std::string& aclFile, std::string& errorText); Manageable::status_t lookup (management::Args& args, std::string& text); Manageable::status_t lookupPublish(management::Args& args, std::string& text); - virtual qpid::management::ManagementObject* GetManagementObject(void) const; + virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index d1706b5907..dfc99bb834 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -60,7 +60,7 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init, const std::string& _queueName, const string& ae) : - link(_link), channel(_id), args(_args), mgmtObject(0), + link(_link), channel(_id), args(_args), listener(l), name(_name), queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() : _queueName), @@ -71,10 +71,10 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, { ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Bridge + mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } @@ -296,9 +296,9 @@ uint32_t Bridge::encodedSize() const + 2; // sync } -management::ManagementObject* Bridge::GetManagementObject (void) const +management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const { - return (management::ManagementObject*) mgmtObject; + return mgmtObject; } management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index ee298afd45..2b4d019ccd 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -72,7 +72,7 @@ class Bridge : public PersistableConfig, bool isDetached() const { return detached; } - management::ManagementObject* GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); @@ -128,7 +128,7 @@ class Bridge : public PersistableConfig, Link* const link; const framing::ChannelId channel; qmf::org::apache::qpid::broker::ArgsLinkBridge args; - qmf::org::apache::qpid::broker::Bridge* mgmtObject; + qmf::org::apache::qpid::broker::Bridge::shared_ptr mgmtObject; CancellationListener listener; std::string name; std::string queueName; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 32b0fdd46e..0c466aea07 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -214,7 +214,6 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - mgmtObject(0), queueCleaner(queues, &timer), recoveryInProgress(false), recovery(true), @@ -235,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); - mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); + mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); @@ -450,9 +449,9 @@ Broker::~Broker() { QPID_LOG(notice, "Shut down"); } -ManagementObject* Broker::GetManagementObject(void) const +ManagementObject::shared_ptr Broker::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable* Broker::GetVhostObject(void) const diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 9b90330cb9..11757657b8 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -171,7 +171,7 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ConnectionCodec::Factory> factory; DtxManager dtxManager; SessionManager sessionManager; - qmf::org::apache::qpid::broker::Broker* mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; QueueCleaner queueCleaner; @@ -230,7 +230,7 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 08a9c756d0..043325a4fa 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -97,7 +97,6 @@ Connection::Connection(ConnectionOutputHandler* out_, link(link_), mgmtClosing(false), mgmtId(mgmtId_), - mgmtObject(0), links(broker_.getLinks()), agent(0), timer(broker_.getTimer()), @@ -119,7 +118,7 @@ void Connection::addManagementObject() { agent = broker.getManagementAgent(); if (agent != 0) { // TODO set last bool true if system connection - mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false); + mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false)); mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId); } @@ -403,9 +402,9 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject* Connection::GetManagementObject(void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&) diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index d01599ce54..3ef9877750 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); @@ -196,7 +196,7 @@ class Connection : public sys::ConnectionInputHandler, const std::string mgmtId; sys::Mutex ioCallbackLock; std::queue<boost::function0<void> > ioCallbacks; - qmf::org::apache::qpid::broker::Connection* mgmtObject; + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; sys::Timer& timer; @@ -231,7 +231,7 @@ class Connection : public sys::ConnectionInputHandler, public: - qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } + qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } }; }} diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index a484cc054e..d1dd1fae6c 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -150,7 +150,7 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProp void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) { const framing::FieldTable& clientProperties = body.getClientProperties(); - qmf::org::apache::qpid::broker::Connection* mgmtObject = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject(); if (mgmtObject != 0) { string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index bb5dc2b807..12360df81d 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -165,19 +165,19 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) + sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } } @@ -185,20 +185,20 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) + args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } @@ -294,9 +294,9 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges) } } -ManagementObject* Exchange::GetManagementObject (void) const +ManagementObject::shared_ptr Exchange::GetManagementObject (void) const { - return (ManagementObject*) mgmtExchange; + return mgmtExchange; } void Exchange::registerDynamicBridge(DynamicBridge* db) @@ -345,16 +345,16 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, FieldTable _args, const string& _origin) - : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin) { } Exchange::Binding::~Binding () { if (mgmtBinding != 0) { - ManagementObject* mo = queue->GetManagementObject(); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) - static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); + mo->dec_bindingCount(); mgmtBinding->resourceDestroy (); } } @@ -367,25 +367,25 @@ void Exchange::Binding::startManagement() if (broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - ManagementObject* mo = queue->GetManagementObject(); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) { management::ObjectId queueId = mo->getObjectId(); - mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); + mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args))); if (!origin.empty()) mgmtBinding->set_origin(origin); agent->addObject(mgmtBinding); - static_cast<_qmf::Queue*>(mo)->inc_bindingCount(); + mo->inc_bindingCount(); } } } } } -ManagementObject* Exchange::Binding::GetManagementObject () const +ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const { - return (ManagementObject*) mgmtBinding; + return mgmtBinding; } Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 26ee5b0054..517b551a83 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -52,13 +52,13 @@ public: const std::string key; const framing::FieldTable args; std::string origin; - qmf::org::apache::qpid::broker::Binding* mgmtBinding; + qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding; Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); void startManagement(); - management::ManagementObject* GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObject() const; }; private: @@ -159,8 +159,8 @@ protected: } }; - qmf::org::apache::qpid::broker::Exchange* mgmtExchange; - qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; + qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange; + qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; public: typedef boost::shared_ptr<Exchange> shared_ptr; @@ -210,7 +210,7 @@ public: static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; // Federation hooks class DynamicBridge { diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index afa5623ecd..b014217180 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -143,7 +143,7 @@ Link::Link(const string& _name, host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), - persistenceId(0), mgmtObject(0), broker(_broker), state(0), + persistenceId(0), broker(_broker), state(0), visitCount(0), currentInterval(1), closing(false), @@ -161,7 +161,7 @@ Link::Link(const string& _name, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject = _qmf::Link::shared_ptr(new _qmf::Link(agent, this, parent, name, durable)); mgmtObject->set_host(host); mgmtObject->set_port(port); mgmtObject->set_transport(transport); @@ -638,9 +638,9 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject* Link::GetManagementObject (void) const +ManagementObject::shared_ptr Link::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } void Link::close() { diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index f0cb90e73b..49ee3ef36a 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -69,7 +69,7 @@ class Link : public PersistableConfig, public management::Manageable { std::string username; std::string password; mutable uint64_t persistenceId; - qmf::org::apache::qpid::broker::Link* mgmtObject; + qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject; Broker* broker; int state; uint32_t visitCount; @@ -181,7 +181,7 @@ class Link : public PersistableConfig, public management::Manageable { static bool isEncodedLink(const std::string& key); // Manageable entry points - management::ManagementObject* GetManagementObject(void) const; + management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); // manage the exchange owned by this link diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 9cf2f541ce..9dc9ec7a6d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -77,8 +77,8 @@ namespace { inline void mgntEnqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0) { _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -101,8 +101,8 @@ inline void mgntEnqStats(const Message& msg, } inline void mgntDeqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -179,8 +179,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, messages(new MessageDeque()), persistenceId(0), settings(b ? merge(_settings, b->getOptions()) : _settings), - mgmtObject(0), - brokerMgmtObject(0), eventMode(0), broker(b), deleted(false), @@ -195,17 +193,17 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); - if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); + mgmtObject = _qmf::Queue::shared_ptr( + new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } } - + if ( settings.isBrowseOnly ) { QPID_LOG ( info, "Queue " << name << " is browse-only." ); } @@ -213,11 +211,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, Queue::~Queue() { - if (mgmtObject != 0) { - mgmtObject->resourceDestroy(); - if (brokerMgmtObject) - brokerMgmtObject->dec_queueCount(); - } } bool isLocalTo(const OwnershipToken* token, const Message& msg) @@ -1076,6 +1069,12 @@ void Queue::destroyed() boost::bind(&QueueObserver::destroy, _1)); observers.clear(); } + + if (mgmtObject != 0) { + mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } void Queue::notifyDeleted() @@ -1109,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { - ManagementObject* childObj = externalQueueStore->GetManagementObject(); + ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1263,7 +1262,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject* childObj = inst->GetManagementObject(); + ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1311,9 +1310,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const } -ManagementObject* Queue::GetManagementObject (void) const +ManagementObject::shared_ptr Queue::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index f6ea6e0adb..32e9201b5b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -151,8 +151,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, std::string alternateExchangeName; boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; - qmf::org::apache::qpid::broker::Queue* mgmtObject; - qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; + qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; @@ -339,7 +339,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index c52cdee6a4..944cc7e838 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -68,7 +68,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) + flowStopped(false), count(0), size(0), broker(0) { uint32_t maxCount(0); uint64_t maxSize(0); @@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); broker = queue->getBroker(); - queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); if (queueMgmtObj) { queueMgmtObj->set_flowStopped(isFlowControlActive()); } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 1bcc388ceb..0e83457efa 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -31,14 +31,8 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" +#include "qmf/org/apache/qpid/broker/Queue.h" -namespace qmf { -namespace org { -namespace apache { -namespace qpid { -namespace broker { - class Queue; -}}}}} namespace _qmfBroker = qmf::org::apache::qpid::broker; namespace qpid { @@ -118,7 +112,7 @@ struct QueueSettings; std::map<framing::SequenceNumber, Message > index; mutable qpid::sys::Mutex indexLock; - _qmfBroker::Queue *queueMgmtObj; + _qmfBroker::Queue::shared_ptr queueMgmtObj; const Broker *broker; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 2d7c820b63..bc7c96f08c 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -424,7 +424,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) &challenge, &challenge_len); processAuthenticationStep(code, challenge, challenge_len); - qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); if ( cnxMgmt ) cnxMgmt->set_saslMechanism(mechanism); } @@ -507,7 +507,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr if (ssf) { securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } - qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); if ( cnxMgmt ) cnxMgmt->set_saslSsf(ssf); return securityLayer; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 65530394a3..f3d97dc078 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -300,8 +300,7 @@ Consumer(_name, type), arguments(_arguments), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0), - mgmtObject(0) + deliveryCount(0) { if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) { @@ -310,17 +309,17 @@ Consumer(_name, type), if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), - !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); + mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); } } } -ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 67cfe808d0..1d7ccbfa9c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -96,7 +96,7 @@ class SemanticState : private boost::noncopyable { bool notifyEnabled; const int syncFrequency; int deliveryCount; - qmf::org::apache::qpid::broker::Subscription* mgmtObject; + qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; bool checkCredit(const Message& msg); void allocateCredit(const Message& msg); @@ -160,7 +160,7 @@ class SemanticState : private boost::noncopyable { void acknowledged(const DeliveryRecord&) {} // manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; QPID_BROKER_EXTERN management::Manageable::status_t diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index fe357eb949..42be45f7ce 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -58,7 +58,6 @@ SessionState::SessionState( broker(b), handler(&h), semanticState(*this), adapter(semanticState), - mgmtObject(0), asyncCommandCompleter(new AsyncCommandCompleter(this)) { if (!delayManagement) addManagementObject(); @@ -71,8 +70,8 @@ void SessionState::addManagementObject() { if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Session - (agent, this, parent, getId().getName()); + mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session + (agent, this, parent, getId().getName())); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); @@ -149,9 +148,9 @@ void SessionState::giveReadCredit(int32_t credit) { getConnection().outputTasks.giveReadCredit(credit); } -ManagementObject* SessionState::GetManagementObject (void) const +ManagementObject::shared_ptr SessionState::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 5e3a77d7ed..af384ff761 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState, const qpid::types::Variant::Map& annotations, bool sync); // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); @@ -168,7 +168,7 @@ class SessionState : public qpid::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; - qmf::org::apache::qpid::broker::Session* mgmtObject; + qmf::org::apache::qpid::broker::Session::shared_ptr mgmtObject; qpid::framing::SequenceSet accepted; // sequence numbers for pending received Execution.Sync commands diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index fa8df6406b..8d54427fdc 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -31,7 +31,7 @@ using namespace qpid::broker; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -System::System (string _dataDir, Broker* broker) : mgmtObject(0) +System::System (string _dataDir, Broker* broker) { ManagementAgent* agent = broker ? broker->getManagementAgent() : 0; @@ -64,7 +64,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } } - mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); + mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array()))); qpid::sys::SystemInfo::getSystemId (osName, nodeName, release, diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 6847c662ae..591d2a14a6 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -35,7 +35,7 @@ class System : public management::Manageable { private: - qmf::org::apache::qpid::broker::System* mgmtObject; + qmf::org::apache::qpid::broker::System::shared_ptr mgmtObject; framing::Uuid systemId; std::string osName, nodeName, release, version, machine; @@ -45,7 +45,7 @@ class System : public management::Manageable System (std::string _dataDir, Broker* broker = 0); - management::ManagementObject* GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index a9ca3b42ab..e72118b570 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -29,7 +29,7 @@ namespace qpid { namespace management { class Manageable; }} -Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0) +Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) { if (parentBroker != 0 && broker != 0) { @@ -37,7 +37,7 @@ Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmt if (agent != 0) { - mgmtObject = new _qmf::Vhost(agent, this, parentBroker, "/"); + mgmtObject = _qmf::Vhost::shared_ptr(new _qmf::Vhost(agent, this, parentBroker, "/")); agent->addObject(mgmtObject, 0, true); } } diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index 9554d641c2..599b821870 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -32,7 +32,7 @@ class Vhost : public management::Manageable { private: - qmf::org::apache::qpid::broker::Vhost* mgmtObject; + qmf::org::apache::qpid::broker::Vhost::shared_ptr mgmtObject; public: @@ -40,7 +40,7 @@ class Vhost : public management::Manageable Vhost (management::Manageable* parentBroker, Broker* broker = 0); - management::ManagementObject* GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } void setFederationTag(const std::string& tag); }; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 877fa021d0..c3d0598249 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -64,7 +64,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) systemId(broker.getSystem()->getSystemId().data()), settings(s), observer(new ConnectionObserver(*this, systemId)), - mgmtObject(0), status(STANDALONE), membership(systemId), replicationTest(s.replicateDefault.get()) @@ -95,7 +94,7 @@ void HaBroker::initialize() { if (settings.cluster && !ma) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker")); mgmtObject->set_replicateDefault(settings.replicateDefault.str()); mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 48db0a8d3c..530211ced4 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -71,7 +71,7 @@ class HaBroker : public management::Manageable void initialize(); // Implement Manageable. - qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } + qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); @@ -124,7 +124,7 @@ class HaBroker : public management::Manageable boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary boost::shared_ptr<Backup> backup; boost::shared_ptr<Primary> primary; - qmf::org::apache::qpid::ha::HaBroker* mgmtObject; + qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; BrokerStatus status; diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index ce4e545c80..b933c71bbb 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -91,9 +91,8 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { void RemoteBackup::ready(const QueuePtr& q) { catchupQueues.erase(q); - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << QueueSetPrinter(", waiting for: ", catchupQueues)); - if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", " + << catchupQueues.size() << " remain to catch up"); } // Called via ConfigurationObserver::queueCreate and from catchupQueue diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp index 01fceb7783..17613ce3dd 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp +++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp @@ -87,8 +87,8 @@ void StatusCheckThread::run() { string status = details["status"].getString(); if (status != "joining") { statusCheck.setPromote(false); - QPID_LOG(error, statusCheck.logPrefix << "Broker " << url << " status is " << status - << " this broker will refuse promotion."); + QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is " + << status << ", this broker will refuse promotion."); } QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 474c86ed48..8e19b68284 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -62,6 +62,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { + const size_t qmfV1BufferSize(65536); const string defaultVendorName("vendor"); const string defaultProductName("product"); @@ -113,9 +114,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent () QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); if (mgmtObject != 0) { mgmtObject->resourceDestroy(); - agent.deleteObjectNowLH(mgmtObject->getObjectId()); - delete mgmtObject; - mgmtObject = 0; + agent.deleteObjectNow(mgmtObject->getObjectId()); + mgmtObject.reset(); } } @@ -124,8 +124,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE), memstat(0) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100) { nextObjectId = 1; brokerBank = 1; @@ -136,7 +135,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : attrMap["_vendor"] = defaultVendorName; attrMap["_product"] = defaultProductName; - memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); + memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker")); addObject(memstat, "amqp-broker"); } @@ -155,15 +154,6 @@ ManagementAgent::~ManagementAgent () v2Direct.reset(); remoteAgents.clear(); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); } } @@ -316,11 +306,12 @@ void ManagementAgent::registerEvent (const string& packageName, } // Deprecated: V1 objects -ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent) +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent) { uint16_t sequence; uint64_t objectNum; + sys::Mutex::ScopedLock lock(addLock); sequence = persistent ? 0 : bootSequence; objectNum = persistId ? persistId : nextObjectId++; @@ -329,17 +320,14 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId object->setObjectId(objId); - { - sys::Mutex::ScopedLock lock(addLock); - newManagementObjects.push_back(object); - } + newManagementObjects.push_back(object); QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; } -ObjectId ManagementAgent::addObject(ManagementObject* object, +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, const string& key, bool persistent) { @@ -369,12 +357,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi "emerg", "alert", "crit", "error", "warn", "note", "info", "debug" }; - sys::Mutex::ScopedLock lock (userLock); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; if (qmf1Support) { - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + char buffer[qmfV1BufferSize]; + Buffer outBuffer(buffer, qmfV1BufferSize); encodeHeader(outBuffer, 'e'); outBuffer.putShortString(event.getPackageName()); @@ -385,9 +372,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi string sBuf; event.encode(sBuf); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, + sendBuffer(outBuffer, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -426,7 +411,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi Variant::List list_; list_.push_back(map_); ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } } @@ -480,12 +465,9 @@ void ManagementAgent::clientAdded (const string& routingKey) while (rkeys.size()) { char localBuffer[16]; Buffer outBuffer(localBuffer, 16); - uint32_t outLen; encodeHeader(outBuffer, 'x'); - outLen = outBuffer.getPosition(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); + sendBuffer(outBuffer, dExchange, rkeys.front()); QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); rkeys.pop_front(); } @@ -496,8 +478,8 @@ void ManagementAgent::clusterUpdate() { // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. sys::Mutex::ScopedLock l(userLock); - moveNewObjectsLH(); // keep lists consistent with updater/updatee. - moveDeletedObjectsLH(); + moveNewObjects(); // keep lists consistent with updater/updatee. + moveDeletedObjects(); clientWasAdded = true; debugSnapshot("Cluster member joined"); } @@ -523,12 +505,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey) { if (suppressed) { QPID_LOG(debug, "Suppressing management message to " << routingKey); @@ -541,6 +520,8 @@ void ManagementAgent::sendBufferLH(Buffer& buf, AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); + size_t length = buf.getPosition(); + buf.reset(); content.castBody<AMQContentBody>()->decode(buf, length); method.setEof(false); @@ -564,38 +545,31 @@ void ManagementAgent::sendBufferLH(Buffer& buf, Message msg(transfer, transfer); msg.setIsManagementMessage(true); - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + DeliverableMessage deliverable (msg, 0); + try { + exchange->route(deliverable); + } catch(exception&) {} buf.reset(); } -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - const string& exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + const string& exchange, + const string& routingKey) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(buf, length, ex, routingKey); + sendBuffer(buf, ex, routingKey); } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey, + uint64_t ttl_msec) { Variant::Map::const_iterator i; @@ -643,34 +617,30 @@ void ManagementAgent::sendBufferLH(const string& data, msg.setIsManagementMessage(true); msg.computeExpiration(broker->getExpiryPolicy()); - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + DeliverableMessage deliverable (msg,0); + try { + exchange->route(deliverable); + } catch(exception&) {} } -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - const string& exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + const string& exchange, + const string& routingKey, + uint64_t ttl_msec) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); + sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec); } /** Objects that have been added since the last periodic poll are temporarily * saved in the newManagementObjects list. This allows objects to be - * added without needing to block on the userLock (addLock is used instead). + * added without needing to block on the userLock (objectLock is used instead). * These new objects need to be integrated into the object database * (managementObjects) *before* they can be properly managed. This routine * performs the integration. @@ -680,34 +650,33 @@ void ManagementAgent::sendBufferLH(const string& data, * duplicate object ids. To avoid clashes, don't put deleted objects * into the active object database. */ -void ManagementAgent::moveNewObjectsLH() +void ManagementAgent::moveNewObjects() { - sys::Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock objLock (objectLock); while (!newManagementObjects.empty()) { - ManagementObject *object = newManagementObjects.back(); + ManagementObject::shared_ptr object = newManagementObjects.back(); newManagementObjects.pop_back(); if (object->isDeleted()) { DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete object; } else { // add to active object list, check for duplicates. ObjectId oid = object->getObjectId(); ManagementObjectMap::iterator destIter = managementObjects.find(oid); if (destIter != managementObjects.end()) { // duplicate found. It is OK if the old object has been marked // deleted, just replace the old with the new. - ManagementObject *oldObj = destIter->second; - if (oldObj->isDeleted()) { - DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete oldObj; - } else { + ManagementObject::shared_ptr oldObj = destIter->second; + if (!oldObj->isDeleted()) { // Duplicate non-deleted objects? This is a user error - oids must be unique. // for now, leak the old object (safer than deleting - may still be referenced) // and complain loudly... QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + oldObj->resourceDestroy(); } + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); // QPID-3666: be sure to replace the -index- also, as non-key members of // the index object may be different for the new object! So erase the // entry, rather than []= assign here: @@ -720,32 +689,41 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { -#define BUFSIZE 65536 #define HEADROOM 4096 debugSnapshot("Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); - uint32_t contentSize; string routingKey; string sBuf; - moveNewObjectsLH(); + moveNewObjects(); // // If we're publishing updates, get the latest memory statistics and uptime now // if (publish) { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - qpid::sys::MemStat::loadMemInfo(memstat); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat.get()); + } + + // + // Use a copy of the management object map to avoid holding the objectLock + // + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); } // // Clear the been-here flag on all objects in the map. // - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; object->setFlags(0); if (clientWasAdded) { object->setForcePublish(true); @@ -760,22 +738,25 @@ void ManagementAgent::periodicProcessing (void) // if we sent the active update first, _then_ the delete update, clients // would incorrectly think the object was deleted. See QPID-2997 // - bool objectsDeleted = moveDeletedObjectsLH(); + bool objectsDeleted = moveDeletedObjects(); + PendingDeletedObjsMap localPendingDeletedObjs; + { + sys::Mutex::ScopedLock objLock(objectLock); + localPendingDeletedObjs.swap(pendingDeletedObjs); + } // // If we are not publishing updates, just clear the pending deletes. There's no // need to tell anybody. // if (!publish) - pendingDeletedObjs.clear(); - - if (!pendingDeletedObjs.empty()) { - // use a temporary copy of the pending deletes so dropping the lock when - // the buffer is sent is safe. - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); + localPendingDeletedObjs.clear(); - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + ResizableBuffer msgBuffer(qmfV1BufferSize); + if (!localPendingDeletedObjs.empty()) { + for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin(); + mIter != localPendingDeletedObjs.end(); + mIter++) { std::string packageName; std::string className; msgBuffer.reset(); @@ -807,11 +788,10 @@ void ManagementAgent::periodicProcessing (void) } if (v1Objs >= maxReplyObjs) { v1Objs = 0; - contentSize = msgBuffer.getSize(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -840,7 +820,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -850,11 +830,10 @@ void ManagementAgent::periodicProcessing (void) // send any remaining objects... if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -877,7 +856,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -885,9 +864,7 @@ void ManagementAgent::periodicProcessing (void) } // - // Process the entire object map. Remember: we drop the userLock each time we call - // sendBuffer(). This allows the managementObjects map to be altered during the - // sendBuffer() call, so always restart the search after a sendBuffer() call + // Process the entire object map. // // If publish is disabled, don't send any updates. // @@ -897,14 +874,14 @@ void ManagementAgent::periodicProcessing (void) uint32_t pcount; uint32_t scount; uint32_t v1Objs, v2Objs; - ManagementObjectMap::iterator baseIter; + ManagementObjectVector::iterator baseIter; std::string packageName; std::string className; - for (baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); + for (baseIter = localManagementObjects.begin(); + baseIter != localManagementObjects.end(); baseIter++) { - ManagementObject* baseObject = baseIter->second; + ManagementObject::shared_ptr baseObject = *baseIter; // // Skip until we find a base object requiring processing... // @@ -915,7 +892,7 @@ void ManagementAgent::periodicProcessing (void) } } - if (baseIter == managementObjects.end()) + if (baseIter == localManagementObjects.end()) break; // done - all objects processed pcount = scount = 0; @@ -924,12 +901,12 @@ void ManagementAgent::periodicProcessing (void) list_.clear(); msgBuffer.reset(); - for (ManagementObjectMap::iterator iter = baseIter; - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = baseIter; + iter != localManagementObjects.end(); iter++) { msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space - ManagementObject* baseObject = baseIter->second; - ManagementObject* object = iter->second; + ManagementObject::shared_ptr baseObject = *baseIter; + ManagementObject::shared_ptr object = *iter; bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); @@ -1004,12 +981,11 @@ void ManagementAgent::periodicProcessing (void) if (pcount || scount) { if (qmf1Support) { - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { + if (msgBuffer.getPosition() > 0) { stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1035,7 +1011,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1045,21 +1021,21 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - if (objectsDeleted) deleteOrphanedAgentsLH(); + if (objectsDeleted) { + sys::Mutex::ScopedLock lock (userLock); + deleteOrphanedAgentsLH(); + } // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. if (qmf1Support) { - uint32_t contentSize; - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); + char msgChars[qmfV1BufferSize]; + Buffer msgBuffer(msgChars, qmfV1BufferSize); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); + sendBuffer(msgBuffer, mExchange, routingKey); QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); } @@ -1087,23 +1063,26 @@ void ManagementAgent::periodicProcessing (void) // Set TTL (in msecs) on outgoing heartbeat indications based on the interval // time to prevent stale heartbeats from getting to the consoles. - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); + sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); } } -void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) +void ManagementAgent::deleteObjectNow(const ObjectId& oid) { - ManagementObjectMap::iterator iter = managementObjects.find(oid); - if (iter == managementObjects.end()) - return; - ManagementObject* object = iter->second; - if (!object->isDeleted()) - return; + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(oid); + if (iter == managementObjects.end()) + return; + object = iter->second; + if (!object->isDeleted()) + return; + managementObjects.erase(oid); + } - // since sendBufferLH drops the userLock, don't call it until we - // are done manipulating the object. #define DNOW_BUFSIZE 2048 char msgChars[DNOW_BUFSIZE]; Buffer msgBuffer(msgChars, DNOW_BUFSIZE); @@ -1139,15 +1118,12 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) v2key << "." << instanceNameKey; } - object = 0; - managementObjects.erase(oid); + object.reset(); // object deleted, ok to drop lock now. if (publish && qmf1Support) { - uint32_t contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); + sendBuffer(msgBuffer, mExchange, v1key.str()); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } @@ -1160,29 +1136,26 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } } -void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence, - uint32_t code, const string& text) +void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence, + uint32_t code, const string& text) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, - const string& text, uint32_t code, bool viaLocal) +void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid, + const string& text, uint32_t code, bool viaLocal) { static const string addr_exchange("qmf.default.direct"); @@ -1200,7 +1173,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons map["_values"] = values; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } @@ -1211,7 +1184,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const bool topic, int qmfVersion) { - sys::Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); if (topic && qmfVersion == 1) { @@ -1225,23 +1197,23 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // schema.# if (routingKey == "broker") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.length() > 6) { if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + return authorizeAgentMessage(msg); } if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } } @@ -1253,7 +1225,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // Intercept messages bound to: // "console.ind.locate.# - process these messages, and also allow them to be forwarded. if (routingKey == "console.request.agent_locate") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } @@ -1264,7 +1236,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // "<name_address>" - the broker agent's proper name // and do not forward them futher if (routingKey == "broker" || routingKey == name_address) { - dispatchAgentCommandLH(msg, routingKey == "broker"); + dispatchAgentCommand(msg, routingKey == "broker"); return false; } } @@ -1273,16 +1245,15 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, return true; } -void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; string packageName; string className; uint8_t hash[16]; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); AclModule* acl = broker->getAcl(); string inArgs; @@ -1304,9 +1275,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (disallowAllV1Methods) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); return; } @@ -1315,9 +1284,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (i != disallowed.end()) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(i->second); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1331,30 +1298,34 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } } - ManagementObjectMap::iterator iter = numericFind(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (!object || object->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { + if ((object->getPackageName() != packageName) || + (object->getClassName() != className)) { outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); } else { uint32_t pos = outBuffer.getPosition(); try { - sys::Mutex::ScopedUnlock u(userLock); string outBuf; - iter->second->doMethod(methodName, inArgs, outBuf, userId); + object->doMethod(methodName, inArgs, outBuf, userId); outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.setPosition(pos);; @@ -1364,17 +1335,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl } } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, - const string& cid, const ConnectionToken* connToken, bool viaLocal) +void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk, + const string& cid, const ConnectionToken* connToken, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; Variant::Map inMap; @@ -1393,8 +1362,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), - Manageable::STATUS_PARAMETER_INVALID, viaLocal); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + Manageable::STATUS_PARAMETER_INVALID, viaLocal); return; } @@ -1412,16 +1381,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r inArgs = (mid->second).asMap(); } } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } - if (iter == managementObjects.end() || iter->second->isDeleted()) { + if (!object || object->isDeleted()) { stringstream estr; estr << "No object found with ID=" << objId; - sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); + sendException(rte, rtk, cid, estr.str(), 1, viaLocal); return; } @@ -1429,20 +1404,20 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r AclModule* acl = broker->getAcl(); DisallowedMethods::const_iterator i; - i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); + i = disallowed.find(make_pair(object->getClassName(), methodName)); if (i != disallowed.end()) { - sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); + sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); return; } string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); if (acl != 0) { map<acl::Property, string> params; - params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); - params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); + params[acl::PROP_SCHEMACLASS] = object->getClassName(); if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), Manageable::STATUS_FORBIDDEN, viaLocal); return; } @@ -1450,13 +1425,12 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r // invoke the method - QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() - << ":" << iter->second->getClassName() << " method=" << + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName() + << ":" << object->getClassName() << " method=" << methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); try { - sys::Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inArgs, callMap, userId); + object->doMethod(methodName, inArgs, callMap, userId); errorCode = callMap["_status_code"].asUint32(); if (errorCode == 0) { outMap["_arguments"] = Variant::Map(); @@ -1467,62 +1441,59 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r } else error = callMap["_status_text"].asString(); } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } if (errorCode != 0) { - sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); + sendException(rte, rtk, cid, error, errorCode, viaLocal); return; } MapCodec::encode(outMap, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); } -void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); } -void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence) { QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) { - encodeHeader (outBuffer, 'p', sequence); - encodePackageIndication (outBuffer, pIter); + sys::Mutex::ScopedLock lock(userLock); + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + } } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1530,10 +1501,11 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); findOrAddPackageLH(packageName); } -void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1541,40 +1513,39 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) + typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; + std::list<_ckeyType> classes; { - typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; - std::list<_ckeyType> classes; - ClassMap &cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); - cIter != cMap.end(); - cIter++) { - if (cIter->second.hasSchema()) { - classes.push_back(make_pair(cIter->first, cIter->second.kind)); + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap &cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) { + if (cIter->second.hasSchema()) { + classes.push_back(make_pair(cIter->first, cIter->second.kind)); + } } } + } - while (classes.size()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + while (classes.size()) { + ResizableBuffer outBuffer(qmfV1BufferSize); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << - "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); - classes.pop_front(); - } + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); + classes.pop_front(); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t) +void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t) { string packageName; SchemaClassKey key; @@ -1587,20 +1558,18 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); uint32_t sequence = nextRequestSequence++; // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); @@ -1625,7 +1594,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } -void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) +void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1636,34 +1605,32 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); } else - sendCommandCompleteLH(rtk, sequence, 1, "Schema not available"); + sendCommandComplete(rtk, sequence, 1, "Schema not available"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Class key not found"); + sendCommandComplete(rtk, sequence, 1, "Class key not found"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Package not found"); + sendCommandComplete(rtk, sequence, 1, "Package not found"); } -void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) +void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1676,6 +1643,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; @@ -1690,14 +1658,11 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); // Publish a class-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'q'); encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); + sendBuffer(outBuffer, mExchange, "schema.class"); QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } @@ -1756,7 +1721,7 @@ void ManagementAgent::deleteOrphanedAgentsLH() remoteAgents.erase(*dIter); } -void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBrokerBank, requestedAgentBank; @@ -1764,12 +1729,14 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; - moveNewObjectsLH(); + moveNewObjects(); + + sys::Mutex::ScopedLock lock(userLock); deleteOrphanedAgentsLH(); RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } @@ -1788,7 +1755,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep agent->agentBank = assignedBank; agent->routingKey = replyToKey; agent->connectionRef = connectionRef; - agent->mgmtObject = new _qmf::Agent (this, agent.get()); + agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); @@ -1801,25 +1768,22 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; - moveNewObjectsLH(); + moveNewObjects(); ft.decode(inBuffer); @@ -1832,11 +1796,17 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = numericFind(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(selector); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (object) { + ResizableBuffer outBuffer (qmfV1BufferSize); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -1849,89 +1819,80 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe sBuf.clear(); object->writeStatistics(sBuf, true); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); return; } string className (value->get<string>()); - std::list<ObjectId>matches; + std::list<ManagementObject::shared_ptr> matches; if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } // build up a set of all objects to be dumped - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName () == className) { - matches.push_back(object->getObjectId()); + { + sys::Mutex::ScopedLock lock(objectLock); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) { + matches.push_back(object); + } } } - // send them (as sendBufferLH drops the userLock) - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + // send them + ResizableBuffer outBuffer (qmfV1BufferSize); while (matches.size()) { - ObjectId objId = matches.front(); - ManagementObjectMap::iterator oIter = managementObjects.find( objId ); - if (oIter != managementObjects.end()) { - ManagementObject* object = oIter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - string sProps, sStats; - object->writeProperties(sProps); - object->writeStatistics(sStats, true); - - size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. - if (len > MA_BUFFER_SIZE) { - QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); - } else { - if (outBuffer.available() < len) { // not enough room in current buffer, send it. - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. - } - encodeHeader(outBuffer, 'g', sequence); - outBuffer.putRawData(sProps); - outBuffer.putRawData(sStats); + ManagementObject::shared_ptr object = matches.front(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + string sProps, sStats; + object->writeProperties(sProps); + object->writeStatistics(sStats, true); + + size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. + if (len > qmfV1BufferSize) { + QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!"); + } else { + if (outBuffer.available() < len) { // not enough room in current buffer, send it. + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. } + encodeHeader(outBuffer, 'g', sequence); + outBuffer.putRawData(sProps); + outBuffer.putRawData(sStats); } } matches.pop_front(); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) +void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); Variant::Map inMap; Variant::Map::const_iterator i; @@ -1950,17 +1911,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co */ i = inMap.find("_what"); if (i == inMap.end()) { - sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); + sendException(rte, rtk, cid, "_what element missing in Query"); return; } if (i->second.getType() != qpid::types::VAR_STRING) { - sendExceptionLH(rte, rtk, cid, "_what element is not a string"); + sendException(rte, rtk, cid, "_what element is not a string"); return; } if (i->second.asString() != "OBJECT") { - sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); return; } @@ -1984,11 +1945,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co } if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } /* @@ -2000,10 +1961,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List list_; ObjectId objId(i->second.asMap()); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock (objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + if (object) { if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -2027,7 +1992,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); return; } @@ -2037,10 +2002,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List _subList; unsigned int objCount = 0; - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); + } + + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; if (object->getClassName() == className && (packageName.empty() || object->getPackageName() == packageName)) { @@ -2055,7 +2028,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co object->writeTimestamps(map_); object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); + object->getObjectId().mapEncode(oidMap); map_["_values"] = values; map_["_object_id"] = oidMap; @@ -2080,13 +2053,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; while (_list.size() > 1) { ListCodec::encode(_list.front().asList(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); _list.pop_front(); QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); return; } @@ -2094,12 +2067,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co // Unrecognized query - Send empty message to indicate CommandComplete string content; ListCodec::encode(Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); } -void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) +void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid) { QPID_LOG(debug, "RCVD AgentLocateRequest"); @@ -2117,16 +2090,17 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co string content; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); clientWasAdded = true; QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); } -bool ManagementAgent::authorizeAgentMessageLH(Message& msg) +bool ManagementAgent::authorizeAgentMessage(Message& msg) { - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + sys::Mutex::ScopedLock lock(userLock); + ResizableBuffer inBuffer (qmfV1BufferSize); uint32_t sequence = 0; bool methodReq = false; bool mapMsg = false; @@ -2140,7 +2114,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) // authorized or not. In this case, return true (authorized) if there is no ACL in place, // otherwise return false; // - if (msg.getContentSize() > MA_BUFFER_SIZE) + if (msg.getContentSize() > qmfV1BufferSize) return broker->getAcl() == 0; inBuffer.putRawData(msg.getContent()); @@ -2193,11 +2167,11 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) } // look up schema for object to get package and class name - + sys::Mutex::ScopedLock lock(objectLock); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " << objId); return false; } @@ -2256,19 +2230,16 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) cid = p->getCorrelationId(); if (mapMsg) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, false); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, false); } else { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); } QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); @@ -2280,7 +2251,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) return true; } -void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) +void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) { string rte; string rtk; @@ -2295,10 +2266,10 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) else return; - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + ResizableBuffer inBuffer(qmfV1BufferSize); uint8_t opcode; - if (msg.getContentSize() > MA_BUFFER_SIZE) { + if (msg.getContentSize() > qmfV1BufferSize) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.getContentSize()); return; @@ -2317,39 +2288,38 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) string body; string cid; inBuffer.getRawData(body, bufferLen); + { + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } - if (p && p->hasCorrelationId()) { - cid = p->getCorrelationId(); + if (opcode == "_method_request") + return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal); + else if (opcode == "_query_request") + return handleGetQuery(body, rte, rtk, cid, viaLocal); + else if (opcode == "_agent_locate_request") + return handleLocateRequest(body, rte, rtk, cid); } - - if (opcode == "_method_request") - return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); - else if (opcode == "_query_request") - return handleGetQueryLH(body, rte, rtk, cid, viaLocal); - else if (opcode == "_agent_locate_request") - return handleLocateRequestLH(body, rte, rtk, cid); - QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; } // old preV2 binary messages - while (inBuffer.getPosition() < bufferLen) { uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); + if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence); + else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence); + else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence); + else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence); + else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisher()); } } @@ -2365,14 +2335,11 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string QPID_LOG (debug, "ManagementAgent added package " << name); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'p'); encodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); + sendBuffer(outBuffer, mExchange, "schema.package"); QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); return result.first; @@ -2684,7 +2651,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { connectionRef.mapDecode(i->second.asMap()); } - mgmtObject = new _qmf::Agent(&agent, this); + mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this)); if ((i = map_.find("_values")) != map_.end()) { mgmtObject->mapDecodeValues(i->second.asMap()); @@ -2827,8 +2794,8 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) sys::Mutex::ScopedLock lock (userLock); - moveNewObjectsLH(); - moveDeletedObjectsLH(); + moveNewObjects(); + moveDeletedObjects(); // now copy the pending deletes into the outList for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); @@ -2845,15 +2812,15 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) { sys::Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock objLock(objectLock); // Clear out any existing deleted objects - moveNewObjectsLH(); + moveNewObjects(); pendingDeletedObjs.clear(); ManagementObjectMap::iterator i = managementObjects.begin(); // Silently drop any deleted objects left over from receiving the update. while (i != managementObjects.end()) { - ManagementObject* object = i->second; + ManagementObject::shared_ptr object = i->second; if (object->isDeleted()) { - delete object; managementObjects.erase(i++); } else ++i; @@ -2867,7 +2834,7 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) // construct a DeletedObject from a management object. -ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) +ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2) : packageName(src->getPackageName()), className(src->getClassName()) { @@ -2943,14 +2910,17 @@ void ManagementAgent::DeletedObject::encode(std::string& toBuffer) } // Remove Deleted objects, and save for later publishing... -bool ManagementAgent::moveDeletedObjectsLH() { - typedef vector<pair<ObjectId, ManagementObject*> > DeleteList; +bool ManagementAgent::moveDeletedObjects() { + typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList; + + sys::Mutex::ScopedLock lock (objectLock); + DeleteList deleteList; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); ++iter) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = iter->second; if (object->isDeleted()) deleteList.push_back(*iter); } @@ -2959,13 +2929,12 @@ bool ManagementAgent::moveDeletedObjectsLH() { iter != deleteList.rend(); iter++) { - ManagementObject* delObj = iter->second; + ManagementObject::shared_ptr delObj = iter->second; assert(delObj->isDeleted()); DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); managementObjects.erase(iter->first); - delete iter->second; } return !deleteList.empty(); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c7e830dcf5..fba733a984 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -37,6 +37,7 @@ #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/ResizableBuffer.h> +#include <boost/shared_ptr.hpp> #include <memory> #include <string> #include <map> @@ -100,12 +101,12 @@ public: const std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0, - bool persistent = false); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - const std::string& key, - bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + const std::string& key, + bool persistent = false); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); @@ -158,7 +159,7 @@ public: class DeletedObject { public: typedef boost::shared_ptr<DeletedObject> shared_ptr; - DeletedObject(ManagementObject *, bool v1, bool v2); + DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); DeletedObject( const std::string &encoded ); ~DeletedObject() {}; void encode( std::string& toBuffer ); @@ -207,9 +208,9 @@ private: uint32_t agentBank; std::string routingKey; ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} - ManagementObject* GetManagementObject (void) const { return mgmtObject; } + qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); void mapEncode(qpid::types::Variant::Map& _map) const; @@ -276,7 +277,7 @@ private: PackageMap packages; // - // Protected by userLock + // Protected by objectLock // ManagementObjectMap managementObjects; @@ -288,11 +289,11 @@ private: framing::Uuid uuid; // - // Lock hierarchy: If a thread needs to take both addLock and userLock, - // it MUST take userLock first, then addLock. + // Lock ordering: userLock -> addLock -> objectLock // sys::Mutex userLock; sys::Mutex addLock; + sys::Mutex objectLock; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; @@ -335,53 +336,45 @@ private: // list of objects that have been deleted, but have yet to be published // one final time. // Indexed by a string composed of the object's package and class name. - // Protected by userLock. + // Protected by objectLock. typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; PendingDeletedObjsMap pendingDeletedObjs; -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - framing::ResizableBuffer msgBuffer; - // // Memory statistics object // - qmf::org::apache::qpid::broker::Memory *memstat; + qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; void writeData (); void periodicProcessing (void); - void deleteObjectNowLH(const ObjectId& oid); + void deleteObjectNow(const ObjectId& oid); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - const std::string& exchange, - const std::string& routingKey); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - const std::string& exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void moveNewObjectsLH(); - bool moveDeletedObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); + void sendBuffer(framing::Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBuffer(framing::Buffer& buf, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjects(); + bool moveDeletedObjects(); + + bool authorizeAgentMessage(qpid::broker::Message& msg); + void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false); PackageMap::iterator findOrAddPackageLH(std::string name); void addClassLH(uint8_t kind, @@ -399,22 +392,22 @@ private: uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); void deleteOrphanedAgentsLH(); - void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, - uint32_t code = 0, const std::string& text = "OK"); - void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); - void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); - void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); - void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + void sendCommandComplete(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); + void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); + void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); size_t validateSchema(framing::Buffer&, uint8_t kind); diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 02aa87f876..9c21e51a18 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -41,751 +41,700 @@ using namespace qpid::types; namespace qpid { - namespace tests { - - namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test; - namespace { - - typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr; - typedef std::vector<TestObjectPtr> TestObjectVector; - - // Instantiates a broker and its internal management agent. Provides - // factories for constructing Receivers for object indication messages. - // - class AgentFixture - { - MessagingFixture *mFix; - - public: - AgentFixture( unsigned int pubInterval=10, - bool qmfV2=false, - qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) - { - opts.enableMgmt=true; - opts.qmf2Support=qmfV2; - opts.mgmtPubInterval=pubInterval; - mFix = new MessagingFixture(opts, true); - - _qmf::TestObject::registerSelf(getBrokerAgent()); - }; - ~AgentFixture() - { - delete mFix; - }; - ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); } - Receiver createV1DataIndRcvr( const std::string package, const std::string klass ) - { - return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " - "node: {type: queue, " - "x-bindings: [{exchange: qpid.management, " - "key: 'console.obj.1.0.") - + package + std::string(".") + klass - + std::string("'}]}}")); - }; - Receiver createV2DataIndRcvr( const std::string package, const std::string klass ) - { - std::string p(package); - std::replace(p.begin(), p.end(), '.', '_'); - std::string k(klass); - std::replace(k.begin(), k.end(), '.', '_'); - - return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " - "node: {type: queue, " - "x-bindings: [{exchange: qmf.default.topic, " - "key: 'agent.ind.data.") - + p + std::string(".") + k - + std::string("'}]}}")); - }; - }; - - - // A "management object" that supports the TestObject - // - class TestManageable : public qpid::management::Manageable - { - management::ManagementObject* mgmtObj; - const std::string key; - public: - TestManageable(management::ManagementAgent *agent, std::string _key) - : key(_key) - { - _qmf::TestObject *tmp = new _qmf::TestObject(agent, this); - - // seed it with some default values... - tmp->set_string1(key); - tmp->set_bool1(true); - qpid::types::Variant::Map vMap; - vMap["one"] = qpid::types::Variant(1); - vMap["two"] = qpid::types::Variant("two"); - vMap["three"] = qpid::types::Variant("whatever"); - tmp->set_map1(vMap); - - mgmtObj = tmp; - }; - ~TestManageable() { mgmtObj = 0; /* deleted by agent on shutdown */ }; - management::ManagementObject* GetManagementObject() const { return mgmtObj; }; - static void validateTestObjectProperties(_qmf::TestObject& to) - { - // verify the default values are as expected. We don't check 'string1', - // as it is the object key, and is unique for each object (no default value). - BOOST_CHECK(to.get_bool1() == true); - BOOST_CHECK(to.get_map1().size() == 3); - qpid::types::Variant::Map mappy = to.get_map1(); - BOOST_CHECK(1 == (unsigned int)mappy["one"]); - BOOST_CHECK(mappy["two"].asString() == std::string("two")); - BOOST_CHECK(mappy["three"].asString() == std::string("whatever")); - }; - }; - - - // decode a V1 Content Indication message - // - void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen) - { - const size_t MAX_BUFFER_SIZE=65536; - char tmp[MAX_BUFFER_SIZE]; - - objs.clear(); - - BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE); - - ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size()); - Buffer buf(tmp, inMsg.getContent().size()); - - while (buf.available() > 8) { // 8 == qmf v1 header size - BOOST_CHECK_EQUAL(buf.getOctet(), 'A'); - BOOST_CHECK_EQUAL(buf.getOctet(), 'M'); - BOOST_CHECK_EQUAL(buf.getOctet(), '2'); - BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication - // @@todo: kag: how do we skip 'i' entries??? - buf.getLong(); // ignore sequence - - std::string str1; // decode content body as string - buf.getRawData(str1, objLen); - - TestObjectPtr fake(new _qmf::TestObject(0,0)); - fake->readProperties( str1 ); - objs.push_back(fake); - } - } - - - // decode a V2 Content Indication message - // - void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs) - { - objs.clear(); +namespace tests { + +namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test; +namespace { + +typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr; +typedef std::vector<TestObjectPtr> TestObjectVector; + +// Instantiates a broker and its internal management agent. Provides +// factories for constructing Receivers for object indication messages. +// +class AgentFixture +{ + MessagingFixture *mFix; + + public: + AgentFixture( unsigned int pubInterval=10, + bool qmfV2=false, + qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) + { + opts.enableMgmt=true; + opts.qmf2Support=qmfV2; + opts.mgmtPubInterval=pubInterval; + mFix = new MessagingFixture(opts, true); + + _qmf::TestObject::registerSelf(getBrokerAgent()); + }; + ~AgentFixture() + { + delete mFix; + }; + ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); } + Receiver createV1DataIndRcvr( const std::string package, const std::string klass ) + { + return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " + "node: {type: queue, " + "x-bindings: [{exchange: qpid.management, " + "key: 'console.obj.1.0.") + + package + std::string(".") + klass + + std::string("'}]}}")); + }; + Receiver createV2DataIndRcvr( const std::string package, const std::string klass ) + { + std::string p(package); + std::replace(p.begin(), p.end(), '.', '_'); + std::string k(klass); + std::replace(k.begin(), k.end(), '.', '_'); + + return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " + "node: {type: queue, " + "x-bindings: [{exchange: qmf.default.topic, " + "key: 'agent.ind.data.") + + p + std::string(".") + k + + std::string("'}]}}")); + }; +}; + + +// A "management object" that supports the TestObject +// +class TestManageable : public qpid::management::Manageable +{ + management::ManagementObject::shared_ptr mgmtObj; + const std::string key; + public: + TestManageable(management::ManagementAgent *agent, std::string _key) + : key(_key) + { + _qmf::TestObject::shared_ptr tmp(new _qmf::TestObject(agent, this)); + + // seed it with some default values... + tmp->set_string1(key); + tmp->set_bool1(true); + qpid::types::Variant::Map vMap; + vMap["one"] = qpid::types::Variant(1); + vMap["two"] = qpid::types::Variant("two"); + vMap["three"] = qpid::types::Variant("whatever"); + tmp->set_map1(vMap); + + mgmtObj = tmp; + }; + ~TestManageable() { mgmtObj.reset(); } + management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; }; + static void validateTestObjectProperties(_qmf::TestObject& to) + { + // verify the default values are as expected. We don't check 'string1', + // as it is the object key, and is unique for each object (no default value). + BOOST_CHECK(to.get_bool1() == true); + BOOST_CHECK(to.get_map1().size() == 3); + qpid::types::Variant::Map mappy = to.get_map1(); + BOOST_CHECK(1 == (unsigned int)mappy["one"]); + BOOST_CHECK(mappy["two"].asString() == std::string("two")); + BOOST_CHECK(mappy["three"].asString() == std::string("whatever")); + }; +}; + + +// decode a V1 Content Indication message +// +void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen) +{ + const size_t MAX_BUFFER_SIZE=65536; + char tmp[MAX_BUFFER_SIZE]; + + objs.clear(); + + BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE); + + ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size()); + Buffer buf(tmp, inMsg.getContent().size()); + + while (buf.available() > 8) { // 8 == qmf v1 header size + BOOST_CHECK_EQUAL(buf.getOctet(), 'A'); + BOOST_CHECK_EQUAL(buf.getOctet(), 'M'); + BOOST_CHECK_EQUAL(buf.getOctet(), '2'); + BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication + // @@todo: kag: how do we skip 'i' entries??? + buf.getLong(); // ignore sequence + + std::string str1; // decode content body as string + buf.getRawData(str1, objLen); + + TestObjectPtr fake(new _qmf::TestObject(0,0)); + fake->readProperties( str1 ); + objs.push_back(fake); + } +} - BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list")); - const ::qpid::types::Variant::Map& m = inMsg.getProperties(); - Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode")); - BOOST_CHECK(iter != m.end()); - BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication")); +// decode a V2 Content Indication message +// +void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs) +{ + objs.clear(); - Variant::List vList; - ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList); + BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list")); - for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) { - TestObjectPtr fake(new _qmf::TestObject(0,0)); - fake->readTimestamps(lIter->asMap()); - fake->mapDecodeValues((lIter->asMap())["_values"].asMap()); - objs.push_back(fake); - } - } - } + const ::qpid::types::Variant::Map& m = inMsg.getProperties(); + Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode")); + BOOST_CHECK(iter != m.end()); + BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication")); - QPID_AUTO_TEST_SUITE(BrokerMgmtAgent) + Variant::List vList; + ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList); - // verify that an object that is added to the broker's management database is - // published correctly. Furthermore, verify that it is published once after - // it has been deleted. - // - QPID_AUTO_TEST_CASE(v1ObjPublish) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) { + TestObjectPtr fake(new _qmf::TestObject(0,0)); + fake->readTimestamps(lIter->asMap()); + fake->mapDecodeValues((lIter->asMap())["_values"].asMap()); + objs.push_back(fake); + } +} +} - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("obj1")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); +QPID_AUTO_TEST_SUITE(BrokerMgmtAgent) - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); +// verify that an object that is added to the broker's management database is +// published correctly. Furthermore, verify that it is published once after +// it has been deleted. +// +QPID_AUTO_TEST_CASE(v1ObjPublish) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - agent->addObject(tm->GetManagementObject(), 1); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("obj1")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + agent->addObject(tm->GetManagementObject(), 1); - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - TestManageable::validateTestObjectProperties(**oIter); + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - // destroy the object + TestManageable::validateTestObjectProperties(**oIter); - tm->GetManagementObject()->resourceDestroy(); + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted + } - // wait for the deleted object to be published + // destroy the object - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + tm->GetManagementObject()->resourceDestroy(); - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // wait for the deleted object to be published - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - TestManageable::validateTestObjectProperties(**oIter); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - BOOST_CHECK(isDeleted); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } - // Repeat the previous test, but with V2-based object support - // - QPID_AUTO_TEST_CASE(v2ObjPublish) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + BOOST_CHECK(isDeleted); - TestManageable *tm = new TestManageable(agent, std::string("obj2")); + r1.close(); + delete fix; + delete tm; +} - Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); +// Repeat the previous test, but with V2-based object support +// +QPID_AUTO_TEST_CASE(v2ObjPublish) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - agent->addObject(tm->GetManagementObject(), "testobj-1"); + TestManageable *tm = new TestManageable(agent, std::string("obj2")); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); + Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + agent->addObject(tm->GetManagementObject(), "testobj-1"); - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - TestManageable::validateTestObjectProperties(**oIter); + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - // destroy the object + TestManageable::validateTestObjectProperties(**oIter); - tm->GetManagementObject()->resourceDestroy(); + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); + } - // wait for the deleted object to be published + // destroy the object - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + tm->GetManagementObject()->resourceDestroy(); - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + // wait for the deleted object to be published - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - TestManageable::validateTestObjectProperties(**oIter); + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - BOOST_CHECK(isDeleted); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that a deleted object is exported correctly using the - // exportDeletedObjects() method. V1 testcase. - // - QPID_AUTO_TEST_CASE(v1ExportDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("myObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + r1.close(); + delete fix; + delete tm; +} - agent->addObject(tm->GetManagementObject(), 1); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); +// verify that a deleted object is exported correctly using the +// exportDeletedObjects() method. V1 testcase. +// +QPID_AUTO_TEST_CASE(v1ExportDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("myObj")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // destroy the object, then immediately export (before the next poll cycle) + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); + agent->addObject(tm->GetManagementObject(), 1); - // wait for the deleted object to be published + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // destroy the object, then immediately export (before the next poll cycle) - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - TestManageable::validateTestObjectProperties(**oIter); + // wait for the deleted object to be published - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - BOOST_CHECK(isDeleted); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that a deleted object is imported correctly using the - // importDeletedObjects() method. V1 testcase. - // - QPID_AUTO_TEST_CASE(v1ImportDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + // verify there are no deleted objects to export now. - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("anObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + r1.close(); + delete fix; + delete tm; +} - agent->addObject(tm->GetManagementObject(), 1); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); +// verify that a deleted object is imported correctly using the +// importDeletedObjects() method. V1 testcase. +// +QPID_AUTO_TEST_CASE(v1ImportDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("anObj")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // destroy the object, then immediately export (before the next poll cycle) + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); + agent->addObject(tm->GetManagementObject(), 1); - // destroy the broker, and reinistantiate a new one without populating it - // with a TestObject. + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - r1.close(); - delete fix; - delete tm; // should no longer be necessary + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - fix = new AgentFixture(3); - r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + // destroy the object, then immediately export (before the next poll cycle) - // wait for the deleted object to be published + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + // destroy the broker, and reinistantiate a new one without populating it + // with a TestObject. - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + r1.close(); + delete fix; + delete tm; // should no longer be necessary - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + fix = new AgentFixture(3); + r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - TestManageable::validateTestObjectProperties(**oIter); + // wait for the deleted object to be published - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - BOOST_CHECK(isDeleted); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that an object that is added and deleted prior to the - // first poll cycle is accounted for by the export - // - QPID_AUTO_TEST_CASE(v1ExportFastDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + // verify there are no deleted objects to export now. - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("objectifyMe")); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // add, then immediately delete and export the object... + r1.close(); + delete fix; +} - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->addObject(tm->GetManagementObject(), 999); - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); - delete fix; - delete tm; - } +// verify that an object that is added and deleted prior to the +// first poll cycle is accounted for by the export +// +QPID_AUTO_TEST_CASE(v1ExportFastDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("objectifyMe")); - // Verify that we can export and import multiple deleted objects correctly. - // - QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - - // populate the agent with multiple test objects - const size_t objCount = 50; - std::vector<TestManageable *> tmv; - uint32_t objLen; - - for (size_t i = 0; i < objCount; i++) { - std::stringstream key; - key << "testobj-" << std::setfill('x') << std::setw(4) << i; - // (no, seriously, I didn't just do that.) - // Note well: we have to keep the key string length EXACTLY THE SAME - // FOR ALL OBJECTS, so objLen will be the same. Otherwise the - // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). - TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObject()->writePropertiesSize(); - agent->addObject(tm->GetManagementObject(), i + 1); - tmv.push_back(tm); - } + // add, then immediately delete and export the object... - // wait for the objects to be published - Message m1; - uint32_t msgCount = 0; - while(r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - msgCount += objs.size(); - } + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->addObject(tm->GetManagementObject(), 999); + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - BOOST_CHECK_EQUAL(msgCount, objCount); + delete fix; + delete tm; +} - // destroy some of the objects, then immediately export (before the next poll cycle) - uint32_t delCount = 0; - for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); - delCount++; - } +// Verify that we can export and import multiple deleted objects correctly. +// +QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + + // populate the agent with multiple test objects + const size_t objCount = 50; + std::vector<TestManageable *> tmv; + uint32_t objLen; + + for (size_t i = 0; i < objCount; i++) { + std::stringstream key; + key << "testobj-" << std::setfill('x') << std::setw(4) << i; + // (no, seriously, I didn't just do that.) + // Note well: we have to keep the key string length EXACTLY THE SAME + // FOR ALL OBJECTS, so objLen will be the same. Otherwise the + // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). + TestManageable *tm = new TestManageable(agent, key.str()); + objLen = tm->GetManagementObject()->writePropertiesSize(); + agent->addObject(tm->GetManagementObject(), i + 1); + tmv.push_back(tm); + } - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK_EQUAL(delObjs.size(), delCount); + // wait for the objects to be published + Message m1; + uint32_t msgCount = 0; + while(r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + msgCount += objs.size(); + } - // destroy the broker, and reinistantiate a new one without populating it - // with TestObjects. + BOOST_CHECK_EQUAL(msgCount, objCount); - r1.close(); - delete fix; - while (tmv.size()) { - delete tmv.back(); - tmv.pop_back(); - } + // destroy some of the objects, then immediately export (before the next poll cycle) - fix = new AgentFixture(3); - r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + uint32_t delCount = 0; + for (size_t i = 0; i < objCount; i += 2) { + tmv[i]->GetManagementObject()->resourceDestroy(); + delCount++; + } - // wait for the deleted object to be published, verify the count + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK_EQUAL(delObjs.size(), delCount); - uint32_t countDels = 0; - while (r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // destroy the broker, and reinistantiate a new one without populating it + // with TestObjects. - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + r1.close(); + delete fix; + while (tmv.size()) { + delete tmv.back(); + tmv.pop_back(); + } - TestManageable::validateTestObjectProperties(**oIter); + fix = new AgentFixture(3); + r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - countDels++; - } - } + // wait for the deleted object to be published, verify the count - // make sure we get the correct # of deleted objects - BOOST_CHECK_EQUAL(countDels, delCount); + uint32_t countDels = 0; + while (r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + countDels++; } + } - // Verify that we can export and import multiple deleted objects correctly. - // QMF V2 variant - QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - - // populate the agent with multiple test objects - const size_t objCount = 50; - std::vector<TestManageable *> tmv; - - for (size_t i = 0; i < objCount; i++) { - std::stringstream key; - key << "testobj-" << i; - TestManageable *tm = new TestManageable(agent, key.str()); - if (tm->GetManagementObject()->writePropertiesSize()) {} - agent->addObject(tm->GetManagementObject(), key.str()); - tmv.push_back(tm); - } + // make sure we get the correct # of deleted objects + BOOST_CHECK_EQUAL(countDels, delCount); - // wait for the objects to be published - Message m1; - uint32_t msgCount = 0; - while(r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - msgCount += objs.size(); - } + // verify there are no deleted objects to export now. - BOOST_CHECK_EQUAL(msgCount, objCount); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // destroy some of the objects, then immediately export (before the next poll cycle) + r1.close(); + delete fix; +} - uint32_t delCount = 0; - for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); - delCount++; - } +// Verify that we can export and import multiple deleted objects correctly. +// QMF V2 variant +QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + + // populate the agent with multiple test objects + const size_t objCount = 50; + std::vector<TestManageable *> tmv; + + for (size_t i = 0; i < objCount; i++) { + std::stringstream key; + key << "testobj-" << i; + TestManageable *tm = new TestManageable(agent, key.str()); + if (tm->GetManagementObject()->writePropertiesSize()) {} + agent->addObject(tm->GetManagementObject(), key.str()); + tmv.push_back(tm); + } - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK_EQUAL(delObjs.size(), delCount); + // wait for the objects to be published + Message m1; + uint32_t msgCount = 0; + while(r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + msgCount += objs.size(); + } - // destroy the broker, and reinistantiate a new one without populating it - // with TestObjects. + BOOST_CHECK_EQUAL(msgCount, objCount); - r1.close(); - delete fix; - while (tmv.size()) { - delete tmv.back(); - tmv.pop_back(); - } + // destroy some of the objects, then immediately export (before the next poll cycle) - fix = new AgentFixture(3, true); - r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + uint32_t delCount = 0; + for (size_t i = 0; i < objCount; i += 2) { + tmv[i]->GetManagementObject()->resourceDestroy(); + delCount++; + } - // wait for the deleted object to be published, verify the count + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK_EQUAL(delObjs.size(), delCount); - uint32_t countDels = 0; - while (r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + // destroy the broker, and reinistantiate a new one without populating it + // with TestObjects. - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + r1.close(); + delete fix; + while (tmv.size()) { + delete tmv.back(); + tmv.pop_back(); + } - TestManageable::validateTestObjectProperties(**oIter); + fix = new AgentFixture(3, true); + r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - countDels++; - } - } + // wait for the deleted object to be published, verify the count - // make sure we get the correct # of deleted objects - BOOST_CHECK_EQUAL(countDels, delCount); + uint32_t countDels = 0; + while (r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + countDels++; } + } - // See QPID-2997 - QPID_AUTO_TEST_CASE(v2RapidRestoreObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // two objects, same ObjID - TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); - TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); - - // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObject(), "testobj-1"); - tm1->GetManagementObject()->resourceDestroy(); - agent->addObject(tm2->GetManagementObject(), "testobj-1"); - - // expect: a delete notification, then an update notification - TestObjectVector objs; - bool isDeleted = false; - bool isAdvertised = false; - size_t count = 0; - Message m1; - while (r1.fetch(m1, Duration::SECOND * 6)) { - - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - count++; - TestManageable::validateTestObjectProperties(**oIter); - - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) { - isDeleted = true; - BOOST_CHECK(isAdvertised == false); // delete must be first - } else { - isAdvertised = true; - BOOST_CHECK(isDeleted == true); // delete must be first - } - } - } + // make sure we get the correct # of deleted objects + BOOST_CHECK_EQUAL(countDels, delCount); - BOOST_CHECK(isDeleted); - BOOST_CHECK(isAdvertised); - BOOST_CHECK(count == 2); + // verify there are no deleted objects to export now. - r1.close(); - delete fix; - delete tm1; - delete tm2; - } + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // See QPID-2997 - QPID_AUTO_TEST_CASE(v2DuplicateErrorObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // turn off the expected error log message - qpid::log::Options logOpts; - logOpts.selectors.clear(); - logOpts.selectors.push_back("critical+"); - qpid::log::Logger::instance().configure(logOpts); - - // two objects, same ObjID - TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); - TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - // Keep a pointer to the ManagementObject. This test simulates a user-caused error - // case (duplicate objects) where the broker has no choice but to leak a management - // object (safest assumption). To prevent valgrind from flagging this leak, we - // manually clean up the object at the end of the test. - management::ManagementObject *save = tm2->GetManagementObject(); - - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); - - // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObject(), "testobj-1"); - agent->addObject(tm2->GetManagementObject(), "testobj-1"); - - TestObjectVector objs; - size_t count = 0; - Message m1; - while (r1.fetch(m1, Duration::SECOND * 6)) { - - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - count++; - TestManageable::validateTestObjectProperties(**oIter); - } + r1.close(); + delete fix; +} + +// See QPID-2997 +QPID_AUTO_TEST_CASE(v2RapidRestoreObj) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + // two objects, same ObjID + TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); + TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); + + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); + + // add, then immediately delete and re-add a copy of the object + agent->addObject(tm1->GetManagementObject(), "testobj-1"); + tm1->GetManagementObject()->resourceDestroy(); + agent->addObject(tm2->GetManagementObject(), "testobj-1"); + + // expect: a delete notification, then an update notification + TestObjectVector objs; + bool isDeleted = false; + bool isAdvertised = false; + size_t count = 0; + Message m1; + while (r1.fetch(m1, Duration::SECOND * 6)) { + + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + count++; + TestManageable::validateTestObjectProperties(**oIter); + + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) { + isDeleted = true; + BOOST_CHECK(isAdvertised == false); // delete must be first + } else { + isAdvertised = true; + BOOST_CHECK(isDeleted == true); // delete must be first } + } + } - BOOST_CHECK(count == 1); // only one should be accepted. + BOOST_CHECK(isDeleted); + BOOST_CHECK(isAdvertised); + BOOST_CHECK(count == 2); - r1.close(); - delete fix; - delete tm1; - delete tm2; - delete save; - } + r1.close(); + delete fix; + delete tm1; + delete tm2; +} - QPID_AUTO_TEST_SUITE_END() - } +QPID_AUTO_TEST_SUITE_END() +} } diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp index e6010a8e00..d538a8181c 100644 --- a/qpid/cpp/src/tests/testagent.cpp +++ b/qpid/cpp/src/tests/testagent.cpp @@ -59,7 +59,7 @@ class CoreClass : public Manageable { string name; ManagementAgent* agent; - _qmf::Parent* mgmtObject; + _qmf::Parent::shared_ptr mgmtObject; std::vector<ChildClass*> children; Mutex vectorLock; @@ -68,7 +68,7 @@ public: CoreClass(ManagementAgent* agent, string _name); ~CoreClass() { mgmtObject->resourceDestroy(); } - ManagementObject* GetManagementObject(void) const + ManagementObject::shared_ptr GetManagementObject(void) const { return mgmtObject; } void doLoop(); @@ -78,14 +78,14 @@ public: class ChildClass : public Manageable { string name; - _qmf::Child* mgmtObject; + _qmf::Child::shared_ptr mgmtObject; public: ChildClass(ManagementAgent* agent, CoreClass* parent, string name); ~ChildClass() { mgmtObject->resourceDestroy(); } - ManagementObject* GetManagementObject(void) const + ManagementObject::shared_ptr GetManagementObject(void) const { return mgmtObject; } void doWork() @@ -97,9 +97,9 @@ public: CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { static uint64_t persistId = 0x111222333444555LL; - mgmtObject = new _qmf::Parent(agent, this, name); + mgmtObject = _qmf::Parent::shared_ptr(new _qmf::Parent(agent, this, name)); - agent->addObject(mgmtObject, persistId++); + agent->addObject(mgmtObject.get(), persistId++); mgmtObject->set_state("IDLE"); } @@ -146,9 +146,9 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) { - mgmtObject = new _qmf::Child(agent, this, parent, name); + mgmtObject = _qmf::Child::shared_ptr(new _qmf::Child(agent, this, parent, name)); - agent->addObject(mgmtObject); + agent->addObject(mgmtObject.get()); } |
